HDDS-881.009. Encapsulate all client to OM requests into one request message. Contributed by Hanisha Koneru.

This commit is contained in:
Bharat Viswanadham 2018-12-20 09:12:42 -08:00
parent 3961690037
commit 13d3f99b37
6 changed files with 649 additions and 493 deletions

View File

@ -70,6 +70,7 @@
.StorageContainerLocationProtocolPB;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.util.Strings;
import org.apache.ratis.protocol.ClientId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -104,6 +105,7 @@ public class RpcClient implements ClientProtocol {
private final long streamBufferMaxSize;
private final long blockSize;
private final long watchTimeout;
private ClientId clientId = ClientId.randomId();
/**
* Creates RpcClient instance with the given configuration.
@ -129,7 +131,7 @@ public RpcClient(Configuration conf) throws IOException {
RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
omAddress, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)));
Client.getRpcTimeout(conf)), clientId.toString());
long scmVersion =
RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);

View File

@ -19,9 +19,9 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.util.ArrayList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
@ -40,10 +40,18 @@
.OzoneManagerProtocolProtos.AllocateBlockRequest;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.AllocateBlockResponse;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.CreateKeyRequest;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.CreateKeyResponse;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.CommitKeyRequest;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.CommitKeyResponse;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.DeleteKeyRequest;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.DeleteKeyResponse;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.BucketArgs;
import org.apache.hadoop.ozone.protocol.proto
@ -69,9 +77,9 @@
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.CreateVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.LocateKeyRequest;
.OzoneManagerProtocolProtos.LookupKeyRequest;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.LocateKeyResponse;
.OzoneManagerProtocolProtos.LookupKeyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartCommitUploadPartRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@ -125,9 +133,9 @@
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.ServiceListResponse;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.S3BucketRequest;
.OzoneManagerProtocolProtos.S3CreateBucketRequest;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.S3BucketResponse;
.OzoneManagerProtocolProtos.S3CreateBucketResponse;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.S3DeleteBucketRequest;
import org.apache.hadoop.ozone.protocol.proto
@ -140,14 +148,15 @@
.S3ListBucketsRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3ListBucketsResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.stream.Collectors;
/**
@ -164,14 +173,16 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
private static final RpcController NULL_RPC_CONTROLLER = null;
private final OzoneManagerProtocolPB rpcProxy;
private final String clientID;
/**
* Constructor for KeySpaceManger Client.
* @param rpcProxy
*/
public OzoneManagerProtocolClientSideTranslatorPB(
OzoneManagerProtocolPB rpcProxy) {
OzoneManagerProtocolPB rpcProxy, String clientId) {
this.rpcProxy = rpcProxy;
this.clientID = clientId;
}
/**
@ -192,6 +203,41 @@ public void close() throws IOException {
}
/**
* Return the proxy object underlying this protocol translator.
*
* @return the proxy object underlying this protocol translator.
*/
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
/**
* Returns a OMRequest builder with specified type.
* @param cmdType type of the request
*/
private OMRequest.Builder createOMRequest(Type cmdType) {
return OMRequest.newBuilder()
.setCmdType(cmdType)
.setClientId(clientID);
}
/**
* Submits client request to OM server.
* @param omRequest client request
* @return response from OM
* @throws IOException thrown if any Protobuf service exception occurs
*/
private OMResponse submitRequest(OMRequest omRequest)
throws IOException {
try {
return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, omRequest);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
/**
* Creates a volume.
*
@ -205,13 +251,12 @@ public void createVolume(OmVolumeArgs args) throws IOException {
VolumeInfo volumeInfo = args.getProtobuf();
req.setVolumeInfo(volumeInfo);
final CreateVolumeResponse resp;
try {
resp = rpcProxy.createVolume(NULL_RPC_CONTROLLER,
req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.CreateVolume)
.setCreateVolumeRequest(req)
.build();
CreateVolumeResponse resp = submitRequest(omRequest)
.getCreateVolumeResponse();
if (resp.getStatus() != Status.OK) {
throw new
@ -231,12 +276,14 @@ public void setOwner(String volume, String owner) throws IOException {
SetVolumePropertyRequest.Builder req =
SetVolumePropertyRequest.newBuilder();
req.setVolumeName(volume).setOwnerName(owner);
final SetVolumePropertyResponse resp;
try {
resp = rpcProxy.setVolumeProperty(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.SetVolumeProperty)
.setSetVolumePropertyRequest(req)
.build();
SetVolumePropertyResponse resp = submitRequest(omRequest)
.getSetVolumePropertyResponse();
if (resp.getStatus() != Status.OK) {
throw new
IOException("Volume owner change failed, error:" + resp.getStatus());
@ -255,12 +302,14 @@ public void setQuota(String volume, long quota) throws IOException {
SetVolumePropertyRequest.Builder req =
SetVolumePropertyRequest.newBuilder();
req.setVolumeName(volume).setQuotaInBytes(quota);
final SetVolumePropertyResponse resp;
try {
resp = rpcProxy.setVolumeProperty(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.SetVolumeProperty)
.setSetVolumePropertyRequest(req)
.build();
SetVolumePropertyResponse resp = submitRequest(omRequest)
.getSetVolumePropertyResponse();
if (resp.getStatus() != Status.OK) {
throw new
IOException("Volume quota change failed, error:" + resp.getStatus());
@ -282,12 +331,13 @@ public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl) throws
CheckVolumeAccessRequest.Builder req =
CheckVolumeAccessRequest.newBuilder();
req.setVolumeName(volume).setUserAcl(userAcl);
final CheckVolumeAccessResponse resp;
try {
resp = rpcProxy.checkVolumeAccess(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.CheckVolumeAccess)
.setCheckVolumeAccessRequest(req)
.build();
CheckVolumeAccessResponse resp = submitRequest(omRequest)
.getCheckVolumeAccessResponse();
if (resp.getStatus() == Status.ACCESS_DENIED) {
return false;
@ -310,12 +360,13 @@ public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl) throws
public OmVolumeArgs getVolumeInfo(String volume) throws IOException {
InfoVolumeRequest.Builder req = InfoVolumeRequest.newBuilder();
req.setVolumeName(volume);
final InfoVolumeResponse resp;
try {
resp = rpcProxy.infoVolume(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.InfoVolume)
.setInfoVolumeRequest(req)
.build();
InfoVolumeResponse resp = submitRequest(omRequest).getInfoVolumeResponse();
if (resp.getStatus() != Status.OK) {
throw new
IOException("Info Volume failed, error:" + resp.getStatus());
@ -333,12 +384,14 @@ public OmVolumeArgs getVolumeInfo(String volume) throws IOException {
public void deleteVolume(String volume) throws IOException {
DeleteVolumeRequest.Builder req = DeleteVolumeRequest.newBuilder();
req.setVolumeName(volume);
final DeleteVolumeResponse resp;
try {
resp = rpcProxy.deleteVolume(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.DeleteVolume)
.setDeleteVolumeRequest(req)
.build();
DeleteVolumeResponse resp = submitRequest(omRequest)
.getDeleteVolumeResponse();
if (resp.getStatus() != Status.OK) {
throw new
IOException("Delete Volume failed, error:" + resp.getStatus());
@ -400,24 +453,18 @@ public List<OmVolumeArgs> listAllVolumes(String prefix, String prevKey,
private List<OmVolumeArgs> listVolume(ListVolumeRequest request)
throws IOException {
final ListVolumeResponse resp;
try {
resp = rpcProxy.listVolumes(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.ListVolume)
.setListVolumeRequest(request)
.build();
ListVolumeResponse resp = submitRequest(omRequest).getListVolumeResponse();
if (resp.getStatus() != Status.OK) {
throw new IOException("List volume failed, error: "
+ resp.getStatus());
}
List<OmVolumeArgs> result = Lists.newArrayList();
for (VolumeInfo volInfo : resp.getVolumeInfoList()) {
OmVolumeArgs volArgs = OmVolumeArgs.getFromProtobuf(volInfo);
result.add(volArgs);
}
return resp.getVolumeInfoList().stream()
.map(item -> OmVolumeArgs.getFromProtobuf(item))
.collect(Collectors.toList());
@ -436,13 +483,13 @@ public void createBucket(OmBucketInfo bucketInfo) throws IOException {
BucketInfo bucketInfoProtobuf = bucketInfo.getProtobuf();
req.setBucketInfo(bucketInfoProtobuf);
final CreateBucketResponse resp;
try {
resp = rpcProxy.createBucket(NULL_RPC_CONTROLLER,
req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.CreateBucket)
.setCreateBucketRequest(req)
.build();
CreateBucketResponse resp = submitRequest(omRequest)
.getCreateBucketResponse();
if (resp.getStatus() != Status.OK) {
throw new IOException("Bucket creation failed, error: "
+ resp.getStatus());
@ -465,13 +512,12 @@ public OmBucketInfo getBucketInfo(String volume, String bucket)
req.setVolumeName(volume);
req.setBucketName(bucket);
final InfoBucketResponse resp;
try {
resp = rpcProxy.infoBucket(NULL_RPC_CONTROLLER,
req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.InfoBucket)
.setInfoBucketRequest(req)
.build();
InfoBucketResponse resp = submitRequest(omRequest).getInfoBucketResponse();
if (resp.getStatus() == Status.OK) {
return OmBucketInfo.getFromProtobuf(resp.getBucketInfo());
} else {
@ -492,13 +538,14 @@ public void setBucketProperty(OmBucketArgs args)
SetBucketPropertyRequest.newBuilder();
BucketArgs bucketArgs = args.getProtobuf();
req.setBucketArgs(bucketArgs);
final SetBucketPropertyResponse resp;
try {
resp = rpcProxy.setBucketProperty(NULL_RPC_CONTROLLER,
req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.SetBucketProperty)
.setSetBucketPropertyRequest(req)
.build();
SetBucketPropertyResponse resp = submitRequest(omRequest)
.getSetBucketPropertyResponse();
if (resp.getStatus() != Status.OK) {
throw new IOException("Setting bucket property failed, error: "
+ resp.getStatus());
@ -529,12 +576,13 @@ public List<OmBucketInfo> listBuckets(String volumeName,
reqBuilder.setPrefix(prefix);
}
ListBucketsRequest request = reqBuilder.build();
final ListBucketsResponse resp;
try {
resp = rpcProxy.listBuckets(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.ListBuckets)
.setListBucketsRequest(request)
.build();
ListBucketsResponse resp = submitRequest(omRequest)
.getListBucketsResponse();
if (resp.getStatus() == Status.OK) {
buckets.addAll(
@ -557,7 +605,7 @@ public List<OmBucketInfo> listBuckets(String volumeName,
*/
@Override
public OpenKeySession openKey(OmKeyArgs args) throws IOException {
LocateKeyRequest.Builder req = LocateKeyRequest.newBuilder();
CreateKeyRequest.Builder req = CreateKeyRequest.newBuilder();
KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
@ -584,12 +632,12 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
req.setKeyArgs(keyArgs.build());
final LocateKeyResponse resp;
try {
resp = rpcProxy.createKey(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.CreateKey)
.setCreateKeyRequest(req)
.build();
CreateKeyResponse resp = submitRequest(omRequest).getCreateKeyResponse();
if (resp.getStatus() != Status.OK) {
throw new IOException("Create key failed, error:" + resp.getStatus());
}
@ -598,7 +646,7 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
}
@Override
public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID)
public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientId)
throws IOException {
AllocateBlockRequest.Builder req = AllocateBlockRequest.newBuilder();
KeyArgs keyArgs = KeyArgs.newBuilder()
@ -607,14 +655,15 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID)
.setKeyName(args.getKeyName())
.setDataSize(args.getDataSize()).build();
req.setKeyArgs(keyArgs);
req.setClientID(clientID);
req.setClientID(clientId);
OMRequest omRequest = createOMRequest(Type.AllocateBlock)
.setAllocateBlockRequest(req)
.build();
AllocateBlockResponse resp = submitRequest(omRequest)
.getAllocateBlockResponse();
final AllocateBlockResponse resp;
try {
resp = rpcProxy.allocateBlock(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
if (resp.getStatus() != Status.OK) {
throw new IOException("Allocate block failed, error:" +
resp.getStatus());
@ -623,7 +672,7 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID)
}
@Override
public void commitKey(OmKeyArgs args, long clientID)
public void commitKey(OmKeyArgs args, long clientId)
throws IOException {
CommitKeyRequest.Builder req = CommitKeyRequest.newBuilder();
List<OmKeyLocationInfo> locationInfoList = args.getLocationInfoList();
@ -637,14 +686,14 @@ public void commitKey(OmKeyArgs args, long clientID)
locationInfoList.stream().map(OmKeyLocationInfo::getProtobuf)
.collect(Collectors.toList())).build();
req.setKeyArgs(keyArgs);
req.setClientID(clientID);
req.setClientID(clientId);
OMRequest omRequest = createOMRequest(Type.CommitKey)
.setCommitKeyRequest(req)
.build();
CommitKeyResponse resp = submitRequest(omRequest).getCommitKeyResponse();
final CommitKeyResponse resp;
try {
resp = rpcProxy.commitKey(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
if (resp.getStatus() != Status.OK) {
throw new IOException("Commit key failed, error:" +
resp.getStatus());
@ -654,7 +703,7 @@ public void commitKey(OmKeyArgs args, long clientID)
@Override
public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
LocateKeyRequest.Builder req = LocateKeyRequest.newBuilder();
LookupKeyRequest.Builder req = LookupKeyRequest.newBuilder();
KeyArgs keyArgs = KeyArgs.newBuilder()
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
@ -662,12 +711,12 @@ public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
.setDataSize(args.getDataSize()).build();
req.setKeyArgs(keyArgs);
final LocateKeyResponse resp;
try {
resp = rpcProxy.lookupKey(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.LookupKey)
.setLookupKeyRequest(req)
.build();
LookupKeyResponse resp = submitRequest(omRequest).getLookupKeyResponse();
if (resp.getStatus() != Status.OK) {
throw new IOException("Lookup key failed, error:" +
resp.getStatus());
@ -686,12 +735,12 @@ public void renameKey(OmKeyArgs args, String toKeyName) throws IOException {
req.setKeyArgs(keyArgs);
req.setToKeyName(toKeyName);
final RenameKeyResponse resp;
try {
resp = rpcProxy.renameKey(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.RenameKey)
.setRenameKeyRequest(req)
.build();
RenameKeyResponse resp = submitRequest(omRequest).getRenameKeyResponse();
if (resp.getStatus() != Status.OK) {
throw new IOException("Rename key failed, error:" +
resp.getStatus());
@ -706,19 +755,19 @@ public void renameKey(OmKeyArgs args, String toKeyName) throws IOException {
*/
@Override
public void deleteKey(OmKeyArgs args) throws IOException {
LocateKeyRequest.Builder req = LocateKeyRequest.newBuilder();
DeleteKeyRequest.Builder req = DeleteKeyRequest.newBuilder();
KeyArgs keyArgs = KeyArgs.newBuilder()
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName()).build();
req.setKeyArgs(keyArgs);
final LocateKeyResponse resp;
try {
resp = rpcProxy.deleteKey(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.DeleteKey)
.setDeleteKeyRequest(req)
.build();
DeleteKeyResponse resp = submitRequest(omRequest).getDeleteKeyResponse();
if (resp.getStatus() != Status.OK) {
throw new IOException("Delete key failed, error:" +
resp.getStatus());
@ -735,12 +784,14 @@ public void deleteBucket(String volume, String bucket) throws IOException {
DeleteBucketRequest.Builder req = DeleteBucketRequest.newBuilder();
req.setVolumeName(volume);
req.setBucketName(bucket);
final DeleteBucketResponse resp;
try {
resp = rpcProxy.deleteBucket(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.DeleteBucket)
.setDeleteBucketRequest(req)
.build();
DeleteBucketResponse resp = submitRequest(omRequest)
.getDeleteBucketResponse();
if (resp.getStatus() != Status.OK) {
throw new
IOException("Delete Bucket failed, error:" + resp.getStatus());
@ -767,13 +818,13 @@ public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
reqBuilder.setPrefix(prefix);
}
ListKeysRequest request = reqBuilder.build();
final ListKeysResponse resp;
try {
resp = rpcProxy.listKeys(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
ListKeysRequest req = reqBuilder.build();
OMRequest omRequest = createOMRequest(Type.ListKeys)
.setListKeysRequest(req)
.build();
ListKeysResponse resp = submitRequest(omRequest).getListKeysResponse();
if (resp.getStatus() == Status.OK) {
keys.addAll(
@ -787,39 +838,20 @@ public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
}
}
@Override
public List<ServiceInfo> getServiceList() throws IOException {
ServiceListRequest request = ServiceListRequest.newBuilder().build();
final ServiceListResponse resp;
try {
resp = rpcProxy.getServiceList(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
if (resp.getStatus() == Status.OK) {
return resp.getServiceInfoList().stream()
.map(ServiceInfo::getFromProtobuf)
.collect(Collectors.toList());
} else {
throw new IOException("Getting service list failed, error: "
+ resp.getStatus());
}
}
@Override
public void createS3Bucket(String userName, String s3BucketName)
throws IOException {
S3BucketRequest request = S3BucketRequest.newBuilder()
S3CreateBucketRequest req = S3CreateBucketRequest.newBuilder()
.setUserName(userName)
.setS3Bucketname(s3BucketName)
.build();
final S3BucketResponse resp;
try {
resp = rpcProxy.createS3Bucket(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.CreateS3Bucket)
.setCreateS3BucketRequest(req)
.build();
S3CreateBucketResponse resp = submitRequest(omRequest)
.getCreateS3BucketResponse();
if(resp.getStatus() != Status.OK) {
throw new IOException("Creating S3 bucket failed, error: "
@ -833,12 +865,13 @@ public void deleteS3Bucket(String s3BucketName) throws IOException {
S3DeleteBucketRequest request = S3DeleteBucketRequest.newBuilder()
.setS3BucketName(s3BucketName)
.build();
final S3DeleteBucketResponse resp;
try {
resp = rpcProxy.deleteS3Bucket(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.DeleteS3Bucket)
.setDeleteS3BucketRequest(request)
.build();
S3DeleteBucketResponse resp = submitRequest(omRequest)
.getDeleteS3BucketResponse();
if(resp.getStatus() != Status.OK) {
throw new IOException("Creating S3 bucket failed, error: "
@ -853,12 +886,14 @@ public String getOzoneBucketMapping(String s3BucketName)
S3BucketInfoRequest request = S3BucketInfoRequest.newBuilder()
.setS3BucketName(s3BucketName)
.build();
final S3BucketInfoResponse resp;
try {
resp = rpcProxy.getS3Bucketinfo(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.InfoS3Bucket)
.setInfoS3BucketRequest(request)
.build();
S3BucketInfoResponse resp = submitRequest(omRequest)
.getInfoS3BucketResponse();
if(resp.getStatus() != Status.OK) {
throw new IOException("GetOzoneBucketMapping failed, error:" + resp
.getStatus());
@ -881,12 +916,13 @@ public List<OmBucketInfo> listS3Buckets(String userName, String startKey,
reqBuilder.setPrefix(prefix);
}
S3ListBucketsRequest request = reqBuilder.build();
final S3ListBucketsResponse resp;
try {
resp = rpcProxy.listS3Buckets(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(Type.ListS3Buckets)
.setListS3BucketsRequest(request)
.build();
S3ListBucketsResponse resp = submitRequest(omRequest)
.getListS3BucketsResponse();
if (resp.getStatus() == Status.OK) {
buckets.addAll(
@ -900,16 +936,6 @@ public List<OmBucketInfo> listS3Buckets(String userName, String startKey,
}
}
/**
* Return the proxy object underlying this protocol translator.
*
* @return the proxy object underlying this protocol translator.
*/
@Override
public Object getUnderlyingProxyObject() {
return null;
}
@Override
public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws
IOException {
@ -925,13 +951,14 @@ public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws
.setType(omKeyArgs.getType());
multipartInfoInitiateRequest.setKeyArgs(keyArgs.build());
MultipartInfoInitiateResponse resp;
try {
resp = rpcProxy.initiateMultiPartUpload(NULL_RPC_CONTROLLER,
multipartInfoInitiateRequest.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
OMRequest omRequest = createOMRequest(
Type.InitiateMultiPartUpload)
.setInitiateMultiPartUploadRequest(multipartInfoInitiateRequest.build())
.build();
MultipartInfoInitiateResponse resp = submitRequest(omRequest)
.getInitiateMultiPartUploadResponse();
if (resp.getStatus() != Status.OK) {
throw new IOException("Initiate Multipart upload failed, error:" + resp
.getStatus());
@ -942,7 +969,7 @@ public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws
@Override
public OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
OmKeyArgs omKeyArgs, long clientID) throws IOException {
OmKeyArgs omKeyArgs, long clientId) throws IOException {
MultipartCommitUploadPartRequest.Builder multipartCommitUploadPartRequest
= MultipartCommitUploadPartRequest.newBuilder();
@ -953,18 +980,17 @@ public OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
.setMultipartUploadID(omKeyArgs.getMultipartUploadID())
.setIsMultipartKey(omKeyArgs.getIsMultipartKey())
.setMultipartNumber(omKeyArgs.getMultipartUploadPartNumber());
multipartCommitUploadPartRequest.setClientID(clientID);
multipartCommitUploadPartRequest.setClientID(clientId);
multipartCommitUploadPartRequest.setKeyArgs(keyArgs.build());
MultipartCommitUploadPartResponse response;
OMRequest omRequest = createOMRequest(
Type.CommitMultiPartUpload)
.setCommitMultiPartUploadRequest(multipartCommitUploadPartRequest
.build())
.build();
try {
response = rpcProxy.commitMultipartUploadPart(NULL_RPC_CONTROLLER,
multipartCommitUploadPartRequest.build());
} catch (ServiceException ex) {
throw ProtobufHelper.getRemoteException(ex);
}
MultipartCommitUploadPartResponse response = submitRequest(omRequest)
.getCommitMultiPartUploadResponse();
if (response.getStatus() != Status.OK) {
throw new IOException("Commit multipart upload part key failed, error:"
@ -975,4 +1001,24 @@ public OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
OmMultipartCommitUploadPartInfo(response.getPartName());
return info;
}
public List<ServiceInfo> getServiceList() throws IOException {
ServiceListRequest req = ServiceListRequest.newBuilder().build();
OMRequest omRequest = createOMRequest(Type.ServiceList)
.setServiceListRequest(req)
.build();
final ServiceListResponse resp = submitRequest(omRequest)
.getServiceListResponse();
if (resp.getStatus() == Status.OK) {
return resp.getServiceInfoList().stream()
.map(ServiceInfo::getFromProtobuf)
.collect(Collectors.toList());
} else {
throw new IOException("Getting service list failed, error: "
+ resp.getStatus());
}
}
}

View File

@ -22,6 +22,7 @@
* for what changes are allowed for a *unstable* .proto interface.
*/
syntax = "proto2";
option java_package = "org.apache.hadoop.ozone.protocol.proto";
option java_outer_classname = "OzoneManagerProtocolProtos";
option java_generic_services = true;
@ -29,13 +30,126 @@ option java_generate_equals_and_hash = true;
package hadoop.ozone;
/**
This is file contains the protocol to communicate with
This file contains the protocol to communicate with
Ozone Manager. Ozone Manager manages the namespace for ozone.
This is similar to Namenode for Ozone.
*/
import "hdds.proto";
enum Type {
CreateVolume = 11;
SetVolumeProperty = 12;
CheckVolumeAccess = 13;
InfoVolume = 14;
DeleteVolume = 15;
ListVolume = 16;
CreateBucket = 21;
InfoBucket = 22;
SetBucketProperty = 23;
DeleteBucket = 24;
ListBuckets = 25;
CreateKey = 31;
LookupKey = 32;
RenameKey = 33;
DeleteKey = 34;
ListKeys = 35;
CommitKey = 36;
AllocateBlock = 37;
CreateS3Bucket = 41;
DeleteS3Bucket = 42;
InfoS3Bucket = 43;
ListS3Buckets = 44;
InitiateMultiPartUpload = 45;
CommitMultiPartUpload = 46;
ServiceList = 51;
}
message OMRequest {
required Type cmdType = 1; // Type of the command
// A string that identifies this command, we generate Trace ID in Ozone
// frontend and this allows us to trace that command all over ozone.
optional string traceID = 2;
required string clientId = 3;
optional CreateVolumeRequest createVolumeRequest = 11;
optional SetVolumePropertyRequest setVolumePropertyRequest = 12;
optional CheckVolumeAccessRequest checkVolumeAccessRequest = 13;
optional InfoVolumeRequest infoVolumeRequest = 14;
optional DeleteVolumeRequest deleteVolumeRequest = 15;
optional ListVolumeRequest listVolumeRequest = 16;
optional CreateBucketRequest createBucketRequest = 21;
optional InfoBucketRequest infoBucketRequest = 22;
optional SetBucketPropertyRequest setBucketPropertyRequest = 23;
optional DeleteBucketRequest deleteBucketRequest = 24;
optional ListBucketsRequest listBucketsRequest = 25;
optional CreateKeyRequest createKeyRequest = 31;
optional LookupKeyRequest lookupKeyRequest = 32;
optional RenameKeyRequest renameKeyRequest = 33;
optional DeleteKeyRequest deleteKeyRequest = 34;
optional ListKeysRequest listKeysRequest = 35;
optional CommitKeyRequest commitKeyRequest = 36;
optional AllocateBlockRequest allocateBlockRequest = 37;
optional S3CreateBucketRequest createS3BucketRequest = 41;
optional S3DeleteBucketRequest deleteS3BucketRequest = 42;
optional S3BucketInfoRequest infoS3BucketRequest = 43;
optional S3ListBucketsRequest listS3BucketsRequest = 44;
optional MultipartInfoInitiateRequest initiateMultiPartUploadRequest = 45;
optional MultipartCommitUploadPartRequest commitMultiPartUploadRequest = 46;
optional ServiceListRequest serviceListRequest = 51;
}
message OMResponse {
required Type cmdType = 1; // Type of the command
// A string that identifies this command, we generate Trace ID in Ozone
// frontend and this allows us to trace that command all over ozone.
optional string traceID = 2;
optional bool success = 3 [default=true];
optional string message = 4;
optional CreateVolumeResponse createVolumeResponse = 11;
optional SetVolumePropertyResponse setVolumePropertyResponse = 12;
optional CheckVolumeAccessResponse checkVolumeAccessResponse = 13;
optional InfoVolumeResponse infoVolumeResponse = 14;
optional DeleteVolumeResponse deleteVolumeResponse = 15;
optional ListVolumeResponse listVolumeResponse = 16;
optional CreateBucketResponse createBucketResponse = 21;
optional InfoBucketResponse infoBucketResponse = 22;
optional SetBucketPropertyResponse setBucketPropertyResponse = 23;
optional DeleteBucketResponse deleteBucketResponse = 24;
optional ListBucketsResponse listBucketsResponse = 25;
optional CreateKeyResponse createKeyResponse = 31;
optional LookupKeyResponse lookupKeyResponse = 32;
optional RenameKeyResponse renameKeyResponse = 33;
optional DeleteKeyResponse deleteKeyResponse = 34;
optional ListKeysResponse listKeysResponse = 35;
optional CommitKeyResponse commitKeyResponse = 36;
optional AllocateBlockResponse allocateBlockResponse = 37;
optional S3CreateBucketResponse createS3BucketResponse = 41;
optional S3DeleteBucketResponse deleteS3BucketResponse = 42;
optional S3BucketInfoResponse infoS3BucketResponse = 43;
optional S3ListBucketsResponse listS3BucketsResponse = 44;
optional MultipartInfoInitiateResponse initiateMultiPartUploadResponse = 45;
optional MultipartCommitUploadPartResponse commitMultiPartUploadResponse = 46;
optional ServiceListResponse ServiceListResponse = 51;
}
enum Status {
OK = 1;
VOLUME_NOT_UNIQUE = 2;
@ -84,7 +198,6 @@ message CreateVolumeRequest {
}
message CreateVolumeResponse {
required Status status = 1;
}
@ -130,7 +243,6 @@ message InfoVolumeRequest {
message InfoVolumeResponse {
required Status status = 1;
optional VolumeInfo volumeInfo = 2;
}
/**
@ -226,6 +338,23 @@ message InfoBucketResponse {
optional BucketInfo bucketInfo = 2;
}
message SetBucketPropertyRequest {
required BucketArgs bucketArgs = 1;
}
message SetBucketPropertyResponse {
required Status status = 1;
}
message DeleteBucketRequest {
required string volumeName = 1;
required string bucketName = 2;
}
message DeleteBucketResponse {
required Status status = 1;
}
message ListBucketsRequest {
required string volumeName = 1;
optional string startKey = 2;
@ -277,11 +406,24 @@ message KeyInfo {
optional uint64 latestVersion = 10;
}
message LocateKeyRequest {
message CreateKeyRequest {
required KeyArgs keyArgs = 1;
}
message LocateKeyResponse {
message CreateKeyResponse {
required Status status = 1;
optional KeyInfo keyInfo = 2;
// clients' followup request may carry this ID for stateful operations
// (similar to a cookie).
optional uint64 ID = 3;
optional uint64 openVersion = 4;
}
message LookupKeyRequest {
required KeyArgs keyArgs = 1;
}
message LookupKeyResponse {
required Status status = 1;
optional KeyInfo keyInfo = 2;
// clients' followup request may carry this ID for stateful operations (similar
@ -291,14 +433,6 @@ message LocateKeyResponse {
optional uint64 openVersion = 4;
}
message SetBucketPropertyRequest {
required BucketArgs bucketArgs = 1;
}
message SetBucketPropertyResponse {
required Status status = 1;
}
message RenameKeyRequest{
required KeyArgs keyArgs = 1;
required string toKeyName = 2;
@ -308,13 +442,17 @@ message RenameKeyResponse{
required Status status = 1;
}
message DeleteBucketRequest {
required string volumeName = 1;
required string bucketName = 2;
message DeleteKeyRequest {
required KeyArgs keyArgs = 1;
}
message DeleteBucketResponse {
message DeleteKeyResponse {
required Status status = 1;
optional KeyInfo keyInfo = 2;
// clients' followup request may carry this ID for stateful operations
// (similar to a cookie).
optional uint64 ID = 3;
optional uint64 openVersion = 4;
}
message ListKeysRequest {
@ -330,6 +468,15 @@ message ListKeysResponse {
repeated KeyInfo keyInfo = 2;
}
message CommitKeyRequest {
required KeyArgs keyArgs = 1;
required uint64 clientID = 2;
}
message CommitKeyResponse {
required Status status = 1;
}
message AllocateBlockRequest {
required KeyArgs keyArgs = 1;
required uint64 clientID = 2;
@ -340,15 +487,6 @@ message AllocateBlockResponse {
optional KeyLocation keyLocation = 2;
}
message CommitKeyRequest {
required KeyArgs keyArgs = 1;
required uint64 clientID = 2;
}
message CommitKeyResponse {
required Status status = 1;
}
message ServiceListRequest {
}
@ -374,12 +512,20 @@ message ServiceInfo {
repeated ServicePort servicePorts = 3;
}
message S3BucketRequest {
message S3CreateBucketRequest {
required string userName = 1;
required string s3bucketname = 2;
}
message S3BucketResponse {
message S3CreateBucketResponse {
required Status status = 1;
}
message S3DeleteBucketRequest {
required string s3bucketName = 1;
}
message S3DeleteBucketResponse {
required Status status = 1;
}
@ -391,14 +537,6 @@ message S3BucketInfoResponse {
optional string ozoneMapping = 2;
}
message S3DeleteBucketRequest {
required string s3bucketName = 1;
}
message S3DeleteBucketResponse {
required Status status = 1;
}
message S3ListBucketsRequest {
required string userName = 1;
optional string startKey = 2;
@ -411,7 +549,6 @@ message S3ListBucketsResponse {
repeated BucketInfo bucketInfo = 2;
}
message MultipartInfoInitiateRequest {
required KeyArgs keyArgs = 1;
}
@ -446,147 +583,11 @@ message MultipartCommitUploadPartResponse {
required Status status = 2;
}
/**
The OM service that takes care of Ozone namespace.
*/
service OzoneManagerService {
/**
Creates a Volume.
*/
rpc createVolume(CreateVolumeRequest)
returns(CreateVolumeResponse);
/**
Allows modificiation of volume properties.
*/
rpc setVolumeProperty(SetVolumePropertyRequest)
returns (SetVolumePropertyResponse);
/**
Checks if the specified volume is accesible by the specified user.
*/
rpc checkVolumeAccess(CheckVolumeAccessRequest)
returns (CheckVolumeAccessResponse);
/**
Gets Volume information.
*/
rpc infoVolume(InfoVolumeRequest)
returns(InfoVolumeResponse);
/**
Deletes a volume if it is empty.
*/
rpc deleteVolume(DeleteVolumeRequest)
returns (DeleteVolumeResponse);
/**
Lists Volumes
*/
rpc listVolumes(ListVolumeRequest)
returns (ListVolumeResponse);
/**
Creates a Bucket.
*/
rpc createBucket(CreateBucketRequest)
returns(CreateBucketResponse);
/**
Get Bucket information.
*/
rpc infoBucket(InfoBucketRequest)
returns(InfoBucketResponse);
/**
Sets bucket properties.
*/
rpc setBucketProperty(SetBucketPropertyRequest)
returns(SetBucketPropertyResponse);
/**
Get key.
*/
rpc createKey(LocateKeyRequest)
returns(LocateKeyResponse);
/**
Look up for an existing key.
*/
rpc lookupKey(LocateKeyRequest)
returns(LocateKeyResponse);
/**
Rename an existing key within a bucket.
*/
rpc renameKey(RenameKeyRequest)
returns(RenameKeyResponse);
/**
Delete an existing key.
*/
rpc deleteKey(LocateKeyRequest)
returns(LocateKeyResponse);
/**
Deletes a bucket from volume if it is empty.
*/
rpc deleteBucket(DeleteBucketRequest)
returns (DeleteBucketResponse);
/**
List Buckets.
*/
rpc listBuckets(ListBucketsRequest)
returns(ListBucketsResponse);
/**
List Keys.
*/
rpc listKeys(ListKeysRequest)
returns(ListKeysResponse);
/**
Commit a key.
*/
rpc commitKey(CommitKeyRequest)
returns(CommitKeyResponse);
/**
Allocate a new block for a key.
*/
rpc allocateBlock(AllocateBlockRequest)
returns(AllocateBlockResponse);
/**
Returns list of Ozone services with its configuration details.
*/
rpc getServiceList(ServiceListRequest)
returns(ServiceListResponse);
/**
Creates an S3 bucket and creates an ozone mapping for that bucket.
*/
rpc createS3Bucket(S3BucketRequest)
returns(S3BucketResponse);
rpc deleteS3Bucket(S3DeleteBucketRequest)
returns(S3DeleteBucketResponse);
/**
Gets the Ozone Mapping information for the S3Bucket.
*/
rpc getS3Bucketinfo(S3BucketInfoRequest)
returns(S3BucketInfoResponse);
rpc listS3Buckets(S3ListBucketsRequest)
returns(S3ListBucketsResponse);
rpc initiateMultiPartUpload(MultipartInfoInitiateRequest)
returns (MultipartInfoInitiateResponse);
rpc commitMultipartUploadPart(MultipartCommitUploadPartRequest)
returns (MultipartCommitUploadPartResponse);
// A client-to-OM RPC to send client requests to OM Ratis server
rpc submitRequest(OMRequest)
returns(OMResponse);
}

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainer;
import org.apache.hadoop.ozone.web.storage.DistributedStorageHandler;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.protocol.ClientId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -72,6 +73,7 @@ public final class ObjectStoreHandler implements Closeable {
private final ScmBlockLocationProtocolClientSideTranslatorPB
scmBlockLocationClient;
private final StorageHandler storageHandler;
private ClientId clientId = ClientId.randomId();
/**
* Creates a new ObjectStoreHandler.
@ -117,7 +119,7 @@ public ObjectStoreHandler(Configuration conf) throws IOException {
RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
omAddress, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)));
Client.getRpcTimeout(conf)), clientId.toString());
storageHandler = new DistributedStorageHandler(
new OzoneConfiguration(conf),

View File

@ -265,6 +265,9 @@ public LifeCycle.State getServerState() {
return server.getLifeCycleState();
}
/**
* Get the local directory where ratis logs will be stored.
*/
public static String getOMRatisDirectory(Configuration conf) {
String storageDir = conf.get(OMConfigKeys.OZONE_OM_RATIS_STORAGE_DIR);

View File

@ -49,6 +49,10 @@
.CreateBucketRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CreateBucketResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CreateKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CreateKeyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CreateVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@ -57,6 +61,10 @@
.DeleteBucketRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.DeleteBucketResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.DeleteKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.DeleteKeyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.DeleteVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@ -84,9 +92,9 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ListVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.LocateKeyRequest;
.LookupKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.LocateKeyResponse;
.LookupKeyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartCommitUploadPartRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@ -95,6 +103,10 @@
.MultipartInfoInitiateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoInitiateResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.RenameKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@ -104,9 +116,9 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3BucketInfoResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3BucketRequest;
.S3CreateBucketRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3BucketResponse;
.S3CreateBucketResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3DeleteBucketRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@ -129,6 +141,8 @@
.SetVolumePropertyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.Status;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -157,6 +171,154 @@ public OzoneManagerProtocolServerSideTranslatorPB(
this.impl = impl;
}
/**
* Submit requests to Ratis server for OM HA implmentation.
* TODO: Once HA is implemented fully, we should have only one server side
* translator for OM protocol.
*/
@Override
public OMResponse submitRequest(RpcController controller,
OMRequest request) throws ServiceException {
Type cmdType = request.getCmdType();
OMResponse.Builder responseBuilder = OMResponse.newBuilder()
.setCmdType(cmdType);
switch (cmdType) {
case CreateVolume:
CreateVolumeResponse createVolumeResponse = createVolume(
request.getCreateVolumeRequest());
responseBuilder.setCreateVolumeResponse(createVolumeResponse);
break;
case SetVolumeProperty:
SetVolumePropertyResponse setVolumePropertyResponse = setVolumeProperty(
request.getSetVolumePropertyRequest());
responseBuilder.setSetVolumePropertyResponse(setVolumePropertyResponse);
break;
case CheckVolumeAccess:
CheckVolumeAccessResponse checkVolumeAccessResponse = checkVolumeAccess(
request.getCheckVolumeAccessRequest());
responseBuilder.setCheckVolumeAccessResponse(checkVolumeAccessResponse);
break;
case InfoVolume:
InfoVolumeResponse infoVolumeResponse = infoVolume(
request.getInfoVolumeRequest());
responseBuilder.setInfoVolumeResponse(infoVolumeResponse);
break;
case DeleteVolume:
DeleteVolumeResponse deleteVolumeResponse = deleteVolume(
request.getDeleteVolumeRequest());
responseBuilder.setDeleteVolumeResponse(deleteVolumeResponse);
break;
case ListVolume:
ListVolumeResponse listVolumeResponse = listVolumes(
request.getListVolumeRequest());
responseBuilder.setListVolumeResponse(listVolumeResponse);
break;
case CreateBucket:
CreateBucketResponse createBucketResponse = createBucket(
request.getCreateBucketRequest());
responseBuilder.setCreateBucketResponse(createBucketResponse);
break;
case InfoBucket:
InfoBucketResponse infoBucketResponse = infoBucket(
request.getInfoBucketRequest());
responseBuilder.setInfoBucketResponse(infoBucketResponse);
break;
case SetBucketProperty:
SetBucketPropertyResponse setBucketPropertyResponse = setBucketProperty(
request.getSetBucketPropertyRequest());
responseBuilder.setSetBucketPropertyResponse(setBucketPropertyResponse);
break;
case DeleteBucket:
DeleteBucketResponse deleteBucketResponse = deleteBucket(
request.getDeleteBucketRequest());
responseBuilder.setDeleteBucketResponse(deleteBucketResponse);
break;
case ListBuckets:
ListBucketsResponse listBucketsResponse = listBuckets(
request.getListBucketsRequest());
responseBuilder.setListBucketsResponse(listBucketsResponse);
break;
case CreateKey:
CreateKeyResponse createKeyResponse = createKey(
request.getCreateKeyRequest());
responseBuilder.setCreateKeyResponse(createKeyResponse);
break;
case LookupKey:
LookupKeyResponse lookupKeyResponse = lookupKey(
request.getLookupKeyRequest());
responseBuilder.setLookupKeyResponse(lookupKeyResponse);
break;
case RenameKey:
RenameKeyResponse renameKeyResponse = renameKey(
request.getRenameKeyRequest());
responseBuilder.setRenameKeyResponse(renameKeyResponse);
break;
case DeleteKey:
DeleteKeyResponse deleteKeyResponse = deleteKey(
request.getDeleteKeyRequest());
responseBuilder.setDeleteKeyResponse(deleteKeyResponse);
break;
case ListKeys:
ListKeysResponse listKeysResponse = listKeys(
request.getListKeysRequest());
responseBuilder.setListKeysResponse(listKeysResponse);
break;
case CommitKey:
CommitKeyResponse commitKeyResponse = commitKey(
request.getCommitKeyRequest());
responseBuilder.setCommitKeyResponse(commitKeyResponse);
break;
case AllocateBlock:
AllocateBlockResponse allocateBlockResponse = allocateBlock(
request.getAllocateBlockRequest());
responseBuilder.setAllocateBlockResponse(allocateBlockResponse);
break;
case CreateS3Bucket:
S3CreateBucketResponse s3CreateBucketResponse = createS3Bucket(
request.getCreateS3BucketRequest());
responseBuilder.setCreateS3BucketResponse(s3CreateBucketResponse);
break;
case DeleteS3Bucket:
S3DeleteBucketResponse s3DeleteBucketResponse = deleteS3Bucket(
request.getDeleteS3BucketRequest());
responseBuilder.setDeleteS3BucketResponse(s3DeleteBucketResponse);
break;
case InfoS3Bucket:
S3BucketInfoResponse s3BucketInfoResponse = getS3Bucketinfo(
request.getInfoS3BucketRequest());
responseBuilder.setInfoS3BucketResponse(s3BucketInfoResponse);
break;
case ListS3Buckets:
S3ListBucketsResponse s3ListBucketsResponse = listS3Buckets(
request.getListS3BucketsRequest());
responseBuilder.setListS3BucketsResponse(s3ListBucketsResponse);
break;
case InitiateMultiPartUpload:
MultipartInfoInitiateResponse multipartInfoInitiateResponse =
initiateMultiPartUpload(request.getInitiateMultiPartUploadRequest());
responseBuilder.setInitiateMultiPartUploadResponse(
multipartInfoInitiateResponse);
break;
case CommitMultiPartUpload:
MultipartCommitUploadPartResponse commitUploadPartResponse =
commitMultipartUploadPart(request.getCommitMultiPartUploadRequest());
responseBuilder.setCommitMultiPartUploadResponse(
commitUploadPartResponse);
break;
case ServiceList:
ServiceListResponse serviceListResponse = getServiceList(
request.getServiceListRequest());
responseBuilder.setServiceListResponse(serviceListResponse);
break;
default:
responseBuilder.setSuccess(false);
responseBuilder.setMessage("Unrecognized Command Type: " + cmdType);
break;
}
return responseBuilder.build();
}
// Convert and exception to corresponding status code
private Status exceptionToResponseStatus(IOException ex) {
if (ex instanceof OMException) {
@ -218,10 +380,7 @@ private Status exceptionToResponseStatus(IOException ex) {
}
}
@Override
public CreateVolumeResponse createVolume(
RpcController controller, CreateVolumeRequest request)
throws ServiceException {
private CreateVolumeResponse createVolume(CreateVolumeRequest request) {
CreateVolumeResponse.Builder resp = CreateVolumeResponse.newBuilder();
resp.setStatus(Status.OK);
try {
@ -232,10 +391,8 @@ public CreateVolumeResponse createVolume(
return resp.build();
}
@Override
public SetVolumePropertyResponse setVolumeProperty(
RpcController controller, SetVolumePropertyRequest request)
throws ServiceException {
private SetVolumePropertyResponse setVolumeProperty(
SetVolumePropertyRequest request) {
SetVolumePropertyResponse.Builder resp =
SetVolumePropertyResponse.newBuilder();
resp.setStatus(Status.OK);
@ -255,10 +412,8 @@ public SetVolumePropertyResponse setVolumeProperty(
return resp.build();
}
@Override
public CheckVolumeAccessResponse checkVolumeAccess(
RpcController controller, CheckVolumeAccessRequest request)
throws ServiceException {
private CheckVolumeAccessResponse checkVolumeAccess(
CheckVolumeAccessRequest request) {
CheckVolumeAccessResponse.Builder resp =
CheckVolumeAccessResponse.newBuilder();
resp.setStatus(Status.OK);
@ -276,10 +431,7 @@ public CheckVolumeAccessResponse checkVolumeAccess(
return resp.build();
}
@Override
public InfoVolumeResponse infoVolume(
RpcController controller, InfoVolumeRequest request)
throws ServiceException {
private InfoVolumeResponse infoVolume(InfoVolumeRequest request) {
InfoVolumeResponse.Builder resp = InfoVolumeResponse.newBuilder();
resp.setStatus(Status.OK);
String volume = request.getVolumeName();
@ -292,10 +444,7 @@ public InfoVolumeResponse infoVolume(
return resp.build();
}
@Override
public DeleteVolumeResponse deleteVolume(
RpcController controller, DeleteVolumeRequest request)
throws ServiceException {
private DeleteVolumeResponse deleteVolume(DeleteVolumeRequest request) {
DeleteVolumeResponse.Builder resp = DeleteVolumeResponse.newBuilder();
resp.setStatus(Status.OK);
try {
@ -306,9 +455,7 @@ public DeleteVolumeResponse deleteVolume(
return resp.build();
}
@Override
public ListVolumeResponse listVolumes(
RpcController controller, ListVolumeRequest request)
private ListVolumeResponse listVolumes(ListVolumeRequest request)
throws ServiceException {
ListVolumeResponse.Builder resp = ListVolumeResponse.newBuilder();
List<OmVolumeArgs> result = Lists.newArrayList();
@ -336,10 +483,7 @@ public ListVolumeResponse listVolumes(
return resp.build();
}
@Override
public CreateBucketResponse createBucket(
RpcController controller, CreateBucketRequest
request) throws ServiceException {
private CreateBucketResponse createBucket(CreateBucketRequest request) {
CreateBucketResponse.Builder resp =
CreateBucketResponse.newBuilder();
try {
@ -352,10 +496,7 @@ public CreateBucketResponse createBucket(
return resp.build();
}
@Override
public InfoBucketResponse infoBucket(
RpcController controller, InfoBucketRequest request)
throws ServiceException {
private InfoBucketResponse infoBucket(InfoBucketRequest request) {
InfoBucketResponse.Builder resp =
InfoBucketResponse.newBuilder();
try {
@ -369,12 +510,9 @@ public InfoBucketResponse infoBucket(
return resp.build();
}
@Override
public LocateKeyResponse createKey(
RpcController controller, LocateKeyRequest request
) throws ServiceException {
LocateKeyResponse.Builder resp =
LocateKeyResponse.newBuilder();
private CreateKeyResponse createKey(CreateKeyRequest request) {
CreateKeyResponse.Builder resp =
CreateKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
HddsProtos.ReplicationType type =
@ -408,12 +546,9 @@ public LocateKeyResponse createKey(
return resp.build();
}
@Override
public LocateKeyResponse lookupKey(
RpcController controller, LocateKeyRequest request
) throws ServiceException {
LocateKeyResponse.Builder resp =
LocateKeyResponse.newBuilder();
private LookupKeyResponse lookupKey(LookupKeyRequest request) {
LookupKeyResponse.Builder resp =
LookupKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
@ -430,10 +565,7 @@ public LocateKeyResponse lookupKey(
return resp.build();
}
@Override
public RenameKeyResponse renameKey(
RpcController controller, RenameKeyRequest request)
throws ServiceException {
private RenameKeyResponse renameKey(RenameKeyRequest request) {
RenameKeyResponse.Builder resp = RenameKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
@ -450,10 +582,8 @@ public RenameKeyResponse renameKey(
return resp.build();
}
@Override
public SetBucketPropertyResponse setBucketProperty(
RpcController controller, SetBucketPropertyRequest request)
throws ServiceException {
private SetBucketPropertyResponse setBucketProperty(
SetBucketPropertyRequest request) {
SetBucketPropertyResponse.Builder resp =
SetBucketPropertyResponse.newBuilder();
try {
@ -466,11 +596,9 @@ public SetBucketPropertyResponse setBucketProperty(
return resp.build();
}
@Override
public LocateKeyResponse deleteKey(RpcController controller,
LocateKeyRequest request) throws ServiceException {
LocateKeyResponse.Builder resp =
LocateKeyResponse.newBuilder();
private DeleteKeyResponse deleteKey(DeleteKeyRequest request) {
DeleteKeyResponse.Builder resp =
DeleteKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
@ -486,10 +614,7 @@ public LocateKeyResponse deleteKey(RpcController controller,
return resp.build();
}
@Override
public DeleteBucketResponse deleteBucket(
RpcController controller, DeleteBucketRequest request)
throws ServiceException {
private DeleteBucketResponse deleteBucket(DeleteBucketRequest request) {
DeleteBucketResponse.Builder resp = DeleteBucketResponse.newBuilder();
resp.setStatus(Status.OK);
try {
@ -500,10 +625,7 @@ public DeleteBucketResponse deleteBucket(
return resp.build();
}
@Override
public ListBucketsResponse listBuckets(
RpcController controller, ListBucketsRequest request)
throws ServiceException {
private ListBucketsResponse listBuckets(ListBucketsRequest request) {
ListBucketsResponse.Builder resp =
ListBucketsResponse.newBuilder();
try {
@ -522,9 +644,7 @@ public ListBucketsResponse listBuckets(
return resp.build();
}
@Override
public ListKeysResponse listKeys(RpcController controller,
ListKeysRequest request) throws ServiceException {
private ListKeysResponse listKeys(ListKeysRequest request) {
ListKeysResponse.Builder resp =
ListKeysResponse.newBuilder();
try {
@ -544,9 +664,7 @@ public ListKeysResponse listKeys(RpcController controller,
return resp.build();
}
@Override
public CommitKeyResponse commitKey(RpcController controller,
CommitKeyRequest request) throws ServiceException {
private CommitKeyResponse commitKey(CommitKeyRequest request) {
CommitKeyResponse.Builder resp =
CommitKeyResponse.newBuilder();
try {
@ -574,9 +692,7 @@ public CommitKeyResponse commitKey(RpcController controller,
return resp.build();
}
@Override
public AllocateBlockResponse allocateBlock(RpcController controller,
AllocateBlockRequest request) throws ServiceException {
private AllocateBlockResponse allocateBlock(AllocateBlockRequest request) {
AllocateBlockResponse.Builder resp =
AllocateBlockResponse.newBuilder();
try {
@ -596,9 +712,7 @@ public AllocateBlockResponse allocateBlock(RpcController controller,
return resp.build();
}
@Override
public ServiceListResponse getServiceList(RpcController controller,
ServiceListRequest request) throws ServiceException {
private ServiceListResponse getServiceList(ServiceListRequest request) {
ServiceListResponse.Builder resp = ServiceListResponse.newBuilder();
try {
resp.addAllServiceInfo(impl.getServiceList().stream()
@ -611,10 +725,8 @@ public ServiceListResponse getServiceList(RpcController controller,
return resp.build();
}
@Override
public S3BucketResponse createS3Bucket(RpcController controller,
S3BucketRequest request) throws ServiceException {
S3BucketResponse.Builder resp = S3BucketResponse.newBuilder();
private S3CreateBucketResponse createS3Bucket(S3CreateBucketRequest request) {
S3CreateBucketResponse.Builder resp = S3CreateBucketResponse.newBuilder();
try {
impl.createS3Bucket(request.getUserName(), request.getS3Bucketname());
resp.setStatus(Status.OK);
@ -624,10 +736,7 @@ public S3BucketResponse createS3Bucket(RpcController controller,
return resp.build();
}
@Override
public S3DeleteBucketResponse deleteS3Bucket(RpcController controller,
S3DeleteBucketRequest request) throws
ServiceException {
private S3DeleteBucketResponse deleteS3Bucket(S3DeleteBucketRequest request) {
S3DeleteBucketResponse.Builder resp = S3DeleteBucketResponse.newBuilder();
try {
impl.deleteS3Bucket(request.getS3BucketName());
@ -638,9 +747,7 @@ public S3DeleteBucketResponse deleteS3Bucket(RpcController controller,
return resp.build();
}
@Override
public S3BucketInfoResponse getS3Bucketinfo(RpcController controller,
S3BucketInfoRequest request) throws ServiceException {
private S3BucketInfoResponse getS3Bucketinfo(S3BucketInfoRequest request) {
S3BucketInfoResponse.Builder resp = S3BucketInfoResponse.newBuilder();
try {
resp.setOzoneMapping(
@ -652,9 +759,7 @@ public S3BucketInfoResponse getS3Bucketinfo(RpcController controller,
return resp.build();
}
@Override
public S3ListBucketsResponse listS3Buckets(RpcController controller,
S3ListBucketsRequest request) {
private S3ListBucketsResponse listS3Buckets(S3ListBucketsRequest request) {
S3ListBucketsResponse.Builder resp = S3ListBucketsResponse.newBuilder();
try {
List<OmBucketInfo> buckets = impl.listS3Buckets(
@ -672,9 +777,8 @@ public S3ListBucketsResponse listS3Buckets(RpcController controller,
return resp.build();
}
@Override
public MultipartInfoInitiateResponse initiateMultiPartUpload(
RpcController controller, MultipartInfoInitiateRequest request) {
private MultipartInfoInitiateResponse initiateMultiPartUpload(
MultipartInfoInitiateRequest request) {
MultipartInfoInitiateResponse.Builder resp = MultipartInfoInitiateResponse
.newBuilder();
try {
@ -698,9 +802,8 @@ public MultipartInfoInitiateResponse initiateMultiPartUpload(
return resp.build();
}
@Override
public MultipartCommitUploadPartResponse commitMultipartUploadPart(
RpcController controller, MultipartCommitUploadPartRequest request) {
private MultipartCommitUploadPartResponse commitMultipartUploadPart(
MultipartCommitUploadPartRequest request) {
MultipartCommitUploadPartResponse.Builder resp =
MultipartCommitUploadPartResponse.newBuilder();
try {
@ -722,5 +825,4 @@ public MultipartCommitUploadPartResponse commitMultipartUploadPart(
}
return resp.build();
}
}