diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 6a458873d7..1743372699 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -70,6 +70,7 @@ .StorageContainerLocationProtocolPB; import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.util.Strings; +import org.apache.ratis.protocol.ClientId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,6 +105,7 @@ public class RpcClient implements ClientProtocol { private final long streamBufferMaxSize; private final long blockSize; private final long watchTimeout; + private ClientId clientId = ClientId.randomId(); /** * Creates RpcClient instance with the given configuration. @@ -129,7 +131,7 @@ public RpcClient(Configuration conf) throws IOException { RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), - Client.getRpcTimeout(conf))); + Client.getRpcTimeout(conf)), clientId.toString()); long scmVersion = RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class); diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index ca09f61205..2059c83fe4 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -19,9 +19,9 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.collect.Lists; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import java.util.ArrayList; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolTranslator; @@ -40,10 +40,18 @@ .OzoneManagerProtocolProtos.AllocateBlockRequest; import org.apache.hadoop.ozone.protocol.proto .OzoneManagerProtocolProtos.AllocateBlockResponse; +import org.apache.hadoop.ozone.protocol.proto + .OzoneManagerProtocolProtos.CreateKeyRequest; +import org.apache.hadoop.ozone.protocol.proto + .OzoneManagerProtocolProtos.CreateKeyResponse; import org.apache.hadoop.ozone.protocol.proto .OzoneManagerProtocolProtos.CommitKeyRequest; import org.apache.hadoop.ozone.protocol.proto .OzoneManagerProtocolProtos.CommitKeyResponse; +import org.apache.hadoop.ozone.protocol.proto + .OzoneManagerProtocolProtos.DeleteKeyRequest; +import org.apache.hadoop.ozone.protocol.proto + .OzoneManagerProtocolProtos.DeleteKeyResponse; import org.apache.hadoop.ozone.protocol.proto .OzoneManagerProtocolProtos.BucketArgs; import org.apache.hadoop.ozone.protocol.proto @@ -69,9 +77,9 @@ import org.apache.hadoop.ozone.protocol.proto .OzoneManagerProtocolProtos.CreateVolumeResponse; import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.LocateKeyRequest; + .OzoneManagerProtocolProtos.LookupKeyRequest; import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.LocateKeyResponse; + .OzoneManagerProtocolProtos.LookupKeyResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .MultipartCommitUploadPartRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -125,9 +133,9 @@ import org.apache.hadoop.ozone.protocol.proto .OzoneManagerProtocolProtos.ServiceListResponse; import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.S3BucketRequest; + .OzoneManagerProtocolProtos.S3CreateBucketRequest; import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.S3BucketResponse; + .OzoneManagerProtocolProtos.S3CreateBucketResponse; import org.apache.hadoop.ozone.protocol.proto .OzoneManagerProtocolProtos.S3DeleteBucketRequest; import org.apache.hadoop.ozone.protocol.proto @@ -140,14 +148,15 @@ .S3ListBucketsRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .S3ListBucketsResponse; - - - +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; import java.io.Closeable; import java.io.IOException; import java.util.List; -import java.util.ArrayList; import java.util.stream.Collectors; /** @@ -164,14 +173,16 @@ public final class OzoneManagerProtocolClientSideTranslatorPB private static final RpcController NULL_RPC_CONTROLLER = null; private final OzoneManagerProtocolPB rpcProxy; + private final String clientID; /** * Constructor for KeySpaceManger Client. * @param rpcProxy */ public OzoneManagerProtocolClientSideTranslatorPB( - OzoneManagerProtocolPB rpcProxy) { + OzoneManagerProtocolPB rpcProxy, String clientId) { this.rpcProxy = rpcProxy; + this.clientID = clientId; } /** @@ -192,6 +203,41 @@ public void close() throws IOException { } + /** + * Return the proxy object underlying this protocol translator. + * + * @return the proxy object underlying this protocol translator. + */ + @Override + public Object getUnderlyingProxyObject() { + return rpcProxy; + } + + /** + * Returns a OMRequest builder with specified type. + * @param cmdType type of the request + */ + private OMRequest.Builder createOMRequest(Type cmdType) { + return OMRequest.newBuilder() + .setCmdType(cmdType) + .setClientId(clientID); + } + + /** + * Submits client request to OM server. + * @param omRequest client request + * @return response from OM + * @throws IOException thrown if any Protobuf service exception occurs + */ + private OMResponse submitRequest(OMRequest omRequest) + throws IOException { + try { + return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, omRequest); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + /** * Creates a volume. * @@ -205,13 +251,12 @@ public void createVolume(OmVolumeArgs args) throws IOException { VolumeInfo volumeInfo = args.getProtobuf(); req.setVolumeInfo(volumeInfo); - final CreateVolumeResponse resp; - try { - resp = rpcProxy.createVolume(NULL_RPC_CONTROLLER, - req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + OMRequest omRequest = createOMRequest(Type.CreateVolume) + .setCreateVolumeRequest(req) + .build(); + + CreateVolumeResponse resp = submitRequest(omRequest) + .getCreateVolumeResponse(); if (resp.getStatus() != Status.OK) { throw new @@ -231,12 +276,14 @@ public void setOwner(String volume, String owner) throws IOException { SetVolumePropertyRequest.Builder req = SetVolumePropertyRequest.newBuilder(); req.setVolumeName(volume).setOwnerName(owner); - final SetVolumePropertyResponse resp; - try { - resp = rpcProxy.setVolumeProperty(NULL_RPC_CONTROLLER, req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + OMRequest omRequest = createOMRequest(Type.SetVolumeProperty) + .setSetVolumePropertyRequest(req) + .build(); + + SetVolumePropertyResponse resp = submitRequest(omRequest) + .getSetVolumePropertyResponse(); + if (resp.getStatus() != Status.OK) { throw new IOException("Volume owner change failed, error:" + resp.getStatus()); @@ -255,12 +302,14 @@ public void setQuota(String volume, long quota) throws IOException { SetVolumePropertyRequest.Builder req = SetVolumePropertyRequest.newBuilder(); req.setVolumeName(volume).setQuotaInBytes(quota); - final SetVolumePropertyResponse resp; - try { - resp = rpcProxy.setVolumeProperty(NULL_RPC_CONTROLLER, req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + OMRequest omRequest = createOMRequest(Type.SetVolumeProperty) + .setSetVolumePropertyRequest(req) + .build(); + + SetVolumePropertyResponse resp = submitRequest(omRequest) + .getSetVolumePropertyResponse(); + if (resp.getStatus() != Status.OK) { throw new IOException("Volume quota change failed, error:" + resp.getStatus()); @@ -282,12 +331,13 @@ public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl) throws CheckVolumeAccessRequest.Builder req = CheckVolumeAccessRequest.newBuilder(); req.setVolumeName(volume).setUserAcl(userAcl); - final CheckVolumeAccessResponse resp; - try { - resp = rpcProxy.checkVolumeAccess(NULL_RPC_CONTROLLER, req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + OMRequest omRequest = createOMRequest(Type.CheckVolumeAccess) + .setCheckVolumeAccessRequest(req) + .build(); + + CheckVolumeAccessResponse resp = submitRequest(omRequest) + .getCheckVolumeAccessResponse(); if (resp.getStatus() == Status.ACCESS_DENIED) { return false; @@ -310,12 +360,13 @@ public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl) throws public OmVolumeArgs getVolumeInfo(String volume) throws IOException { InfoVolumeRequest.Builder req = InfoVolumeRequest.newBuilder(); req.setVolumeName(volume); - final InfoVolumeResponse resp; - try { - resp = rpcProxy.infoVolume(NULL_RPC_CONTROLLER, req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + OMRequest omRequest = createOMRequest(Type.InfoVolume) + .setInfoVolumeRequest(req) + .build(); + + InfoVolumeResponse resp = submitRequest(omRequest).getInfoVolumeResponse(); + if (resp.getStatus() != Status.OK) { throw new IOException("Info Volume failed, error:" + resp.getStatus()); @@ -333,12 +384,14 @@ public OmVolumeArgs getVolumeInfo(String volume) throws IOException { public void deleteVolume(String volume) throws IOException { DeleteVolumeRequest.Builder req = DeleteVolumeRequest.newBuilder(); req.setVolumeName(volume); - final DeleteVolumeResponse resp; - try { - resp = rpcProxy.deleteVolume(NULL_RPC_CONTROLLER, req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + OMRequest omRequest = createOMRequest(Type.DeleteVolume) + .setDeleteVolumeRequest(req) + .build(); + + DeleteVolumeResponse resp = submitRequest(omRequest) + .getDeleteVolumeResponse(); + if (resp.getStatus() != Status.OK) { throw new IOException("Delete Volume failed, error:" + resp.getStatus()); @@ -400,24 +453,18 @@ public List listAllVolumes(String prefix, String prevKey, private List listVolume(ListVolumeRequest request) throws IOException { - final ListVolumeResponse resp; - try { - resp = rpcProxy.listVolumes(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + OMRequest omRequest = createOMRequest(Type.ListVolume) + .setListVolumeRequest(request) + .build(); + + ListVolumeResponse resp = submitRequest(omRequest).getListVolumeResponse(); if (resp.getStatus() != Status.OK) { throw new IOException("List volume failed, error: " + resp.getStatus()); } - List result = Lists.newArrayList(); - for (VolumeInfo volInfo : resp.getVolumeInfoList()) { - OmVolumeArgs volArgs = OmVolumeArgs.getFromProtobuf(volInfo); - result.add(volArgs); - } - return resp.getVolumeInfoList().stream() .map(item -> OmVolumeArgs.getFromProtobuf(item)) .collect(Collectors.toList()); @@ -436,13 +483,13 @@ public void createBucket(OmBucketInfo bucketInfo) throws IOException { BucketInfo bucketInfoProtobuf = bucketInfo.getProtobuf(); req.setBucketInfo(bucketInfoProtobuf); - final CreateBucketResponse resp; - try { - resp = rpcProxy.createBucket(NULL_RPC_CONTROLLER, - req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + OMRequest omRequest = createOMRequest(Type.CreateBucket) + .setCreateBucketRequest(req) + .build(); + + CreateBucketResponse resp = submitRequest(omRequest) + .getCreateBucketResponse(); + if (resp.getStatus() != Status.OK) { throw new IOException("Bucket creation failed, error: " + resp.getStatus()); @@ -465,13 +512,12 @@ public OmBucketInfo getBucketInfo(String volume, String bucket) req.setVolumeName(volume); req.setBucketName(bucket); - final InfoBucketResponse resp; - try { - resp = rpcProxy.infoBucket(NULL_RPC_CONTROLLER, - req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + OMRequest omRequest = createOMRequest(Type.InfoBucket) + .setInfoBucketRequest(req) + .build(); + + InfoBucketResponse resp = submitRequest(omRequest).getInfoBucketResponse(); + if (resp.getStatus() == Status.OK) { return OmBucketInfo.getFromProtobuf(resp.getBucketInfo()); } else { @@ -492,13 +538,14 @@ public void setBucketProperty(OmBucketArgs args) SetBucketPropertyRequest.newBuilder(); BucketArgs bucketArgs = args.getProtobuf(); req.setBucketArgs(bucketArgs); - final SetBucketPropertyResponse resp; - try { - resp = rpcProxy.setBucketProperty(NULL_RPC_CONTROLLER, - req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + OMRequest omRequest = createOMRequest(Type.SetBucketProperty) + .setSetBucketPropertyRequest(req) + .build(); + + SetBucketPropertyResponse resp = submitRequest(omRequest) + .getSetBucketPropertyResponse(); + if (resp.getStatus() != Status.OK) { throw new IOException("Setting bucket property failed, error: " + resp.getStatus()); @@ -529,12 +576,13 @@ public List listBuckets(String volumeName, reqBuilder.setPrefix(prefix); } ListBucketsRequest request = reqBuilder.build(); - final ListBucketsResponse resp; - try { - resp = rpcProxy.listBuckets(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + OMRequest omRequest = createOMRequest(Type.ListBuckets) + .setListBucketsRequest(request) + .build(); + + ListBucketsResponse resp = submitRequest(omRequest) + .getListBucketsResponse(); if (resp.getStatus() == Status.OK) { buckets.addAll( @@ -557,7 +605,7 @@ public List listBuckets(String volumeName, */ @Override public OpenKeySession openKey(OmKeyArgs args) throws IOException { - LocateKeyRequest.Builder req = LocateKeyRequest.newBuilder(); + CreateKeyRequest.Builder req = CreateKeyRequest.newBuilder(); KeyArgs.Builder keyArgs = KeyArgs.newBuilder() .setVolumeName(args.getVolumeName()) .setBucketName(args.getBucketName()) @@ -584,12 +632,12 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { req.setKeyArgs(keyArgs.build()); - final LocateKeyResponse resp; - try { - resp = rpcProxy.createKey(NULL_RPC_CONTROLLER, req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + OMRequest omRequest = createOMRequest(Type.CreateKey) + .setCreateKeyRequest(req) + .build(); + + CreateKeyResponse resp = submitRequest(omRequest).getCreateKeyResponse(); + if (resp.getStatus() != Status.OK) { throw new IOException("Create key failed, error:" + resp.getStatus()); } @@ -598,7 +646,7 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { } @Override - public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID) + public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientId) throws IOException { AllocateBlockRequest.Builder req = AllocateBlockRequest.newBuilder(); KeyArgs keyArgs = KeyArgs.newBuilder() @@ -607,14 +655,15 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID) .setKeyName(args.getKeyName()) .setDataSize(args.getDataSize()).build(); req.setKeyArgs(keyArgs); - req.setClientID(clientID); + req.setClientID(clientId); + + OMRequest omRequest = createOMRequest(Type.AllocateBlock) + .setAllocateBlockRequest(req) + .build(); + + AllocateBlockResponse resp = submitRequest(omRequest) + .getAllocateBlockResponse(); - final AllocateBlockResponse resp; - try { - resp = rpcProxy.allocateBlock(NULL_RPC_CONTROLLER, req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } if (resp.getStatus() != Status.OK) { throw new IOException("Allocate block failed, error:" + resp.getStatus()); @@ -623,7 +672,7 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID) } @Override - public void commitKey(OmKeyArgs args, long clientID) + public void commitKey(OmKeyArgs args, long clientId) throws IOException { CommitKeyRequest.Builder req = CommitKeyRequest.newBuilder(); List locationInfoList = args.getLocationInfoList(); @@ -637,14 +686,14 @@ public void commitKey(OmKeyArgs args, long clientID) locationInfoList.stream().map(OmKeyLocationInfo::getProtobuf) .collect(Collectors.toList())).build(); req.setKeyArgs(keyArgs); - req.setClientID(clientID); + req.setClientID(clientId); + + OMRequest omRequest = createOMRequest(Type.CommitKey) + .setCommitKeyRequest(req) + .build(); + + CommitKeyResponse resp = submitRequest(omRequest).getCommitKeyResponse(); - final CommitKeyResponse resp; - try { - resp = rpcProxy.commitKey(NULL_RPC_CONTROLLER, req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } if (resp.getStatus() != Status.OK) { throw new IOException("Commit key failed, error:" + resp.getStatus()); @@ -654,7 +703,7 @@ public void commitKey(OmKeyArgs args, long clientID) @Override public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException { - LocateKeyRequest.Builder req = LocateKeyRequest.newBuilder(); + LookupKeyRequest.Builder req = LookupKeyRequest.newBuilder(); KeyArgs keyArgs = KeyArgs.newBuilder() .setVolumeName(args.getVolumeName()) .setBucketName(args.getBucketName()) @@ -662,12 +711,12 @@ public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException { .setDataSize(args.getDataSize()).build(); req.setKeyArgs(keyArgs); - final LocateKeyResponse resp; - try { - resp = rpcProxy.lookupKey(NULL_RPC_CONTROLLER, req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + OMRequest omRequest = createOMRequest(Type.LookupKey) + .setLookupKeyRequest(req) + .build(); + + LookupKeyResponse resp = submitRequest(omRequest).getLookupKeyResponse(); + if (resp.getStatus() != Status.OK) { throw new IOException("Lookup key failed, error:" + resp.getStatus()); @@ -686,12 +735,12 @@ public void renameKey(OmKeyArgs args, String toKeyName) throws IOException { req.setKeyArgs(keyArgs); req.setToKeyName(toKeyName); - final RenameKeyResponse resp; - try { - resp = rpcProxy.renameKey(NULL_RPC_CONTROLLER, req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + OMRequest omRequest = createOMRequest(Type.RenameKey) + .setRenameKeyRequest(req) + .build(); + + RenameKeyResponse resp = submitRequest(omRequest).getRenameKeyResponse(); + if (resp.getStatus() != Status.OK) { throw new IOException("Rename key failed, error:" + resp.getStatus()); @@ -706,19 +755,19 @@ public void renameKey(OmKeyArgs args, String toKeyName) throws IOException { */ @Override public void deleteKey(OmKeyArgs args) throws IOException { - LocateKeyRequest.Builder req = LocateKeyRequest.newBuilder(); + DeleteKeyRequest.Builder req = DeleteKeyRequest.newBuilder(); KeyArgs keyArgs = KeyArgs.newBuilder() .setVolumeName(args.getVolumeName()) .setBucketName(args.getBucketName()) .setKeyName(args.getKeyName()).build(); req.setKeyArgs(keyArgs); - final LocateKeyResponse resp; - try { - resp = rpcProxy.deleteKey(NULL_RPC_CONTROLLER, req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + OMRequest omRequest = createOMRequest(Type.DeleteKey) + .setDeleteKeyRequest(req) + .build(); + + DeleteKeyResponse resp = submitRequest(omRequest).getDeleteKeyResponse(); + if (resp.getStatus() != Status.OK) { throw new IOException("Delete key failed, error:" + resp.getStatus()); @@ -735,12 +784,14 @@ public void deleteBucket(String volume, String bucket) throws IOException { DeleteBucketRequest.Builder req = DeleteBucketRequest.newBuilder(); req.setVolumeName(volume); req.setBucketName(bucket); - final DeleteBucketResponse resp; - try { - resp = rpcProxy.deleteBucket(NULL_RPC_CONTROLLER, req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + OMRequest omRequest = createOMRequest(Type.DeleteBucket) + .setDeleteBucketRequest(req) + .build(); + + DeleteBucketResponse resp = submitRequest(omRequest) + .getDeleteBucketResponse(); + if (resp.getStatus() != Status.OK) { throw new IOException("Delete Bucket failed, error:" + resp.getStatus()); @@ -767,13 +818,13 @@ public List listKeys(String volumeName, String bucketName, reqBuilder.setPrefix(prefix); } - ListKeysRequest request = reqBuilder.build(); - final ListKeysResponse resp; - try { - resp = rpcProxy.listKeys(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + ListKeysRequest req = reqBuilder.build(); + + OMRequest omRequest = createOMRequest(Type.ListKeys) + .setListKeysRequest(req) + .build(); + + ListKeysResponse resp = submitRequest(omRequest).getListKeysResponse(); if (resp.getStatus() == Status.OK) { keys.addAll( @@ -787,39 +838,20 @@ public List listKeys(String volumeName, String bucketName, } } - @Override - public List getServiceList() throws IOException { - ServiceListRequest request = ServiceListRequest.newBuilder().build(); - final ServiceListResponse resp; - try { - resp = rpcProxy.getServiceList(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - - if (resp.getStatus() == Status.OK) { - return resp.getServiceInfoList().stream() - .map(ServiceInfo::getFromProtobuf) - .collect(Collectors.toList()); - } else { - throw new IOException("Getting service list failed, error: " - + resp.getStatus()); - } - } - @Override public void createS3Bucket(String userName, String s3BucketName) throws IOException { - S3BucketRequest request = S3BucketRequest.newBuilder() + S3CreateBucketRequest req = S3CreateBucketRequest.newBuilder() .setUserName(userName) .setS3Bucketname(s3BucketName) .build(); - final S3BucketResponse resp; - try { - resp = rpcProxy.createS3Bucket(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + OMRequest omRequest = createOMRequest(Type.CreateS3Bucket) + .setCreateS3BucketRequest(req) + .build(); + + S3CreateBucketResponse resp = submitRequest(omRequest) + .getCreateS3BucketResponse(); if(resp.getStatus() != Status.OK) { throw new IOException("Creating S3 bucket failed, error: " @@ -833,12 +865,13 @@ public void deleteS3Bucket(String s3BucketName) throws IOException { S3DeleteBucketRequest request = S3DeleteBucketRequest.newBuilder() .setS3BucketName(s3BucketName) .build(); - final S3DeleteBucketResponse resp; - try { - resp = rpcProxy.deleteS3Bucket(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + OMRequest omRequest = createOMRequest(Type.DeleteS3Bucket) + .setDeleteS3BucketRequest(request) + .build(); + + S3DeleteBucketResponse resp = submitRequest(omRequest) + .getDeleteS3BucketResponse(); if(resp.getStatus() != Status.OK) { throw new IOException("Creating S3 bucket failed, error: " @@ -853,12 +886,14 @@ public String getOzoneBucketMapping(String s3BucketName) S3BucketInfoRequest request = S3BucketInfoRequest.newBuilder() .setS3BucketName(s3BucketName) .build(); - final S3BucketInfoResponse resp; - try { - resp = rpcProxy.getS3Bucketinfo(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + OMRequest omRequest = createOMRequest(Type.InfoS3Bucket) + .setInfoS3BucketRequest(request) + .build(); + + S3BucketInfoResponse resp = submitRequest(omRequest) + .getInfoS3BucketResponse(); + if(resp.getStatus() != Status.OK) { throw new IOException("GetOzoneBucketMapping failed, error:" + resp .getStatus()); @@ -881,12 +916,13 @@ public List listS3Buckets(String userName, String startKey, reqBuilder.setPrefix(prefix); } S3ListBucketsRequest request = reqBuilder.build(); - final S3ListBucketsResponse resp; - try { - resp = rpcProxy.listS3Buckets(NULL_RPC_CONTROLLER, request); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + + OMRequest omRequest = createOMRequest(Type.ListS3Buckets) + .setListS3BucketsRequest(request) + .build(); + + S3ListBucketsResponse resp = submitRequest(omRequest) + .getListS3BucketsResponse(); if (resp.getStatus() == Status.OK) { buckets.addAll( @@ -900,16 +936,6 @@ public List listS3Buckets(String userName, String startKey, } } - /** - * Return the proxy object underlying this protocol translator. - * - * @return the proxy object underlying this protocol translator. - */ - @Override - public Object getUnderlyingProxyObject() { - return null; - } - @Override public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws IOException { @@ -925,13 +951,14 @@ public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws .setType(omKeyArgs.getType()); multipartInfoInitiateRequest.setKeyArgs(keyArgs.build()); - MultipartInfoInitiateResponse resp; - try { - resp = rpcProxy.initiateMultiPartUpload(NULL_RPC_CONTROLLER, - multipartInfoInitiateRequest.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } + OMRequest omRequest = createOMRequest( + Type.InitiateMultiPartUpload) + .setInitiateMultiPartUploadRequest(multipartInfoInitiateRequest.build()) + .build(); + + MultipartInfoInitiateResponse resp = submitRequest(omRequest) + .getInitiateMultiPartUploadResponse(); + if (resp.getStatus() != Status.OK) { throw new IOException("Initiate Multipart upload failed, error:" + resp .getStatus()); @@ -942,7 +969,7 @@ public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws @Override public OmMultipartCommitUploadPartInfo commitMultipartUploadPart( - OmKeyArgs omKeyArgs, long clientID) throws IOException { + OmKeyArgs omKeyArgs, long clientId) throws IOException { MultipartCommitUploadPartRequest.Builder multipartCommitUploadPartRequest = MultipartCommitUploadPartRequest.newBuilder(); @@ -953,18 +980,17 @@ public OmMultipartCommitUploadPartInfo commitMultipartUploadPart( .setMultipartUploadID(omKeyArgs.getMultipartUploadID()) .setIsMultipartKey(omKeyArgs.getIsMultipartKey()) .setMultipartNumber(omKeyArgs.getMultipartUploadPartNumber()); - multipartCommitUploadPartRequest.setClientID(clientID); + multipartCommitUploadPartRequest.setClientID(clientId); multipartCommitUploadPartRequest.setKeyArgs(keyArgs.build()); - MultipartCommitUploadPartResponse response; + OMRequest omRequest = createOMRequest( + Type.CommitMultiPartUpload) + .setCommitMultiPartUploadRequest(multipartCommitUploadPartRequest + .build()) + .build(); - try { - response = rpcProxy.commitMultipartUploadPart(NULL_RPC_CONTROLLER, - multipartCommitUploadPartRequest.build()); - - } catch (ServiceException ex) { - throw ProtobufHelper.getRemoteException(ex); - } + MultipartCommitUploadPartResponse response = submitRequest(omRequest) + .getCommitMultiPartUploadResponse(); if (response.getStatus() != Status.OK) { throw new IOException("Commit multipart upload part key failed, error:" @@ -975,4 +1001,24 @@ public OmMultipartCommitUploadPartInfo commitMultipartUploadPart( OmMultipartCommitUploadPartInfo(response.getPartName()); return info; } + + public List getServiceList() throws IOException { + ServiceListRequest req = ServiceListRequest.newBuilder().build(); + + OMRequest omRequest = createOMRequest(Type.ServiceList) + .setServiceListRequest(req) + .build(); + + final ServiceListResponse resp = submitRequest(omRequest) + .getServiceListResponse(); + + if (resp.getStatus() == Status.OK) { + return resp.getServiceInfoList().stream() + .map(ServiceInfo::getFromProtobuf) + .collect(Collectors.toList()); + } else { + throw new IOException("Getting service list failed, error: " + + resp.getStatus()); + } + } } diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index deb88ee086..93c86f12bc 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -22,6 +22,7 @@ * for what changes are allowed for a *unstable* .proto interface. */ +syntax = "proto2"; option java_package = "org.apache.hadoop.ozone.protocol.proto"; option java_outer_classname = "OzoneManagerProtocolProtos"; option java_generic_services = true; @@ -29,13 +30,126 @@ option java_generate_equals_and_hash = true; package hadoop.ozone; /** -This is file contains the protocol to communicate with +This file contains the protocol to communicate with Ozone Manager. Ozone Manager manages the namespace for ozone. This is similar to Namenode for Ozone. */ import "hdds.proto"; +enum Type { + CreateVolume = 11; + SetVolumeProperty = 12; + CheckVolumeAccess = 13; + InfoVolume = 14; + DeleteVolume = 15; + ListVolume = 16; + + CreateBucket = 21; + InfoBucket = 22; + SetBucketProperty = 23; + DeleteBucket = 24; + ListBuckets = 25; + + CreateKey = 31; + LookupKey = 32; + RenameKey = 33; + DeleteKey = 34; + ListKeys = 35; + CommitKey = 36; + AllocateBlock = 37; + + CreateS3Bucket = 41; + DeleteS3Bucket = 42; + InfoS3Bucket = 43; + ListS3Buckets = 44; + InitiateMultiPartUpload = 45; + CommitMultiPartUpload = 46; + + ServiceList = 51; +} + +message OMRequest { + required Type cmdType = 1; // Type of the command + + // A string that identifies this command, we generate Trace ID in Ozone + // frontend and this allows us to trace that command all over ozone. + optional string traceID = 2; + + required string clientId = 3; + + optional CreateVolumeRequest createVolumeRequest = 11; + optional SetVolumePropertyRequest setVolumePropertyRequest = 12; + optional CheckVolumeAccessRequest checkVolumeAccessRequest = 13; + optional InfoVolumeRequest infoVolumeRequest = 14; + optional DeleteVolumeRequest deleteVolumeRequest = 15; + optional ListVolumeRequest listVolumeRequest = 16; + + optional CreateBucketRequest createBucketRequest = 21; + optional InfoBucketRequest infoBucketRequest = 22; + optional SetBucketPropertyRequest setBucketPropertyRequest = 23; + optional DeleteBucketRequest deleteBucketRequest = 24; + optional ListBucketsRequest listBucketsRequest = 25; + + optional CreateKeyRequest createKeyRequest = 31; + optional LookupKeyRequest lookupKeyRequest = 32; + optional RenameKeyRequest renameKeyRequest = 33; + optional DeleteKeyRequest deleteKeyRequest = 34; + optional ListKeysRequest listKeysRequest = 35; + optional CommitKeyRequest commitKeyRequest = 36; + optional AllocateBlockRequest allocateBlockRequest = 37; + + optional S3CreateBucketRequest createS3BucketRequest = 41; + optional S3DeleteBucketRequest deleteS3BucketRequest = 42; + optional S3BucketInfoRequest infoS3BucketRequest = 43; + optional S3ListBucketsRequest listS3BucketsRequest = 44; + optional MultipartInfoInitiateRequest initiateMultiPartUploadRequest = 45; + optional MultipartCommitUploadPartRequest commitMultiPartUploadRequest = 46; + + optional ServiceListRequest serviceListRequest = 51; +} + +message OMResponse { + required Type cmdType = 1; // Type of the command + + // A string that identifies this command, we generate Trace ID in Ozone + // frontend and this allows us to trace that command all over ozone. + optional string traceID = 2; + + optional bool success = 3 [default=true]; + optional string message = 4; + + optional CreateVolumeResponse createVolumeResponse = 11; + optional SetVolumePropertyResponse setVolumePropertyResponse = 12; + optional CheckVolumeAccessResponse checkVolumeAccessResponse = 13; + optional InfoVolumeResponse infoVolumeResponse = 14; + optional DeleteVolumeResponse deleteVolumeResponse = 15; + optional ListVolumeResponse listVolumeResponse = 16; + + optional CreateBucketResponse createBucketResponse = 21; + optional InfoBucketResponse infoBucketResponse = 22; + optional SetBucketPropertyResponse setBucketPropertyResponse = 23; + optional DeleteBucketResponse deleteBucketResponse = 24; + optional ListBucketsResponse listBucketsResponse = 25; + + optional CreateKeyResponse createKeyResponse = 31; + optional LookupKeyResponse lookupKeyResponse = 32; + optional RenameKeyResponse renameKeyResponse = 33; + optional DeleteKeyResponse deleteKeyResponse = 34; + optional ListKeysResponse listKeysResponse = 35; + optional CommitKeyResponse commitKeyResponse = 36; + optional AllocateBlockResponse allocateBlockResponse = 37; + + optional S3CreateBucketResponse createS3BucketResponse = 41; + optional S3DeleteBucketResponse deleteS3BucketResponse = 42; + optional S3BucketInfoResponse infoS3BucketResponse = 43; + optional S3ListBucketsResponse listS3BucketsResponse = 44; + optional MultipartInfoInitiateResponse initiateMultiPartUploadResponse = 45; + optional MultipartCommitUploadPartResponse commitMultiPartUploadResponse = 46; + + optional ServiceListResponse ServiceListResponse = 51; +} + enum Status { OK = 1; VOLUME_NOT_UNIQUE = 2; @@ -84,7 +198,6 @@ message CreateVolumeRequest { } message CreateVolumeResponse { - required Status status = 1; } @@ -130,7 +243,6 @@ message InfoVolumeRequest { message InfoVolumeResponse { required Status status = 1; optional VolumeInfo volumeInfo = 2; - } /** @@ -226,6 +338,23 @@ message InfoBucketResponse { optional BucketInfo bucketInfo = 2; } +message SetBucketPropertyRequest { + required BucketArgs bucketArgs = 1; +} + +message SetBucketPropertyResponse { + required Status status = 1; +} + +message DeleteBucketRequest { + required string volumeName = 1; + required string bucketName = 2; +} + +message DeleteBucketResponse { + required Status status = 1; +} + message ListBucketsRequest { required string volumeName = 1; optional string startKey = 2; @@ -277,11 +406,24 @@ message KeyInfo { optional uint64 latestVersion = 10; } -message LocateKeyRequest { +message CreateKeyRequest { required KeyArgs keyArgs = 1; } -message LocateKeyResponse { +message CreateKeyResponse { + required Status status = 1; + optional KeyInfo keyInfo = 2; + // clients' followup request may carry this ID for stateful operations + // (similar to a cookie). + optional uint64 ID = 3; + optional uint64 openVersion = 4; +} + +message LookupKeyRequest { + required KeyArgs keyArgs = 1; +} + +message LookupKeyResponse { required Status status = 1; optional KeyInfo keyInfo = 2; // clients' followup request may carry this ID for stateful operations (similar @@ -291,14 +433,6 @@ message LocateKeyResponse { optional uint64 openVersion = 4; } -message SetBucketPropertyRequest { - required BucketArgs bucketArgs = 1; -} - -message SetBucketPropertyResponse { - required Status status = 1; -} - message RenameKeyRequest{ required KeyArgs keyArgs = 1; required string toKeyName = 2; @@ -308,13 +442,17 @@ message RenameKeyResponse{ required Status status = 1; } -message DeleteBucketRequest { - required string volumeName = 1; - required string bucketName = 2; +message DeleteKeyRequest { + required KeyArgs keyArgs = 1; } -message DeleteBucketResponse { +message DeleteKeyResponse { required Status status = 1; + optional KeyInfo keyInfo = 2; + // clients' followup request may carry this ID for stateful operations + // (similar to a cookie). + optional uint64 ID = 3; + optional uint64 openVersion = 4; } message ListKeysRequest { @@ -330,6 +468,15 @@ message ListKeysResponse { repeated KeyInfo keyInfo = 2; } +message CommitKeyRequest { + required KeyArgs keyArgs = 1; + required uint64 clientID = 2; +} + +message CommitKeyResponse { + required Status status = 1; +} + message AllocateBlockRequest { required KeyArgs keyArgs = 1; required uint64 clientID = 2; @@ -340,15 +487,6 @@ message AllocateBlockResponse { optional KeyLocation keyLocation = 2; } -message CommitKeyRequest { - required KeyArgs keyArgs = 1; - required uint64 clientID = 2; -} - -message CommitKeyResponse { - required Status status = 1; -} - message ServiceListRequest { } @@ -374,12 +512,20 @@ message ServiceInfo { repeated ServicePort servicePorts = 3; } -message S3BucketRequest { +message S3CreateBucketRequest { required string userName = 1; required string s3bucketname = 2; } -message S3BucketResponse { +message S3CreateBucketResponse { + required Status status = 1; +} + +message S3DeleteBucketRequest { + required string s3bucketName = 1; +} + +message S3DeleteBucketResponse { required Status status = 1; } @@ -391,14 +537,6 @@ message S3BucketInfoResponse { optional string ozoneMapping = 2; } -message S3DeleteBucketRequest { - required string s3bucketName = 1; -} - -message S3DeleteBucketResponse { - required Status status = 1; -} - message S3ListBucketsRequest { required string userName = 1; optional string startKey = 2; @@ -411,7 +549,6 @@ message S3ListBucketsResponse { repeated BucketInfo bucketInfo = 2; } - message MultipartInfoInitiateRequest { required KeyArgs keyArgs = 1; } @@ -446,147 +583,11 @@ message MultipartCommitUploadPartResponse { required Status status = 2; } - - /** The OM service that takes care of Ozone namespace. */ service OzoneManagerService { - - /** - Creates a Volume. - */ - rpc createVolume(CreateVolumeRequest) - returns(CreateVolumeResponse); - - /** - Allows modificiation of volume properties. - */ - rpc setVolumeProperty(SetVolumePropertyRequest) - returns (SetVolumePropertyResponse); - - /** - Checks if the specified volume is accesible by the specified user. - */ - rpc checkVolumeAccess(CheckVolumeAccessRequest) - returns (CheckVolumeAccessResponse); - - /** - Gets Volume information. - */ - rpc infoVolume(InfoVolumeRequest) - returns(InfoVolumeResponse); - /** - Deletes a volume if it is empty. - */ - rpc deleteVolume(DeleteVolumeRequest) - returns (DeleteVolumeResponse); - - /** - Lists Volumes - */ - rpc listVolumes(ListVolumeRequest) - returns (ListVolumeResponse); - - /** - Creates a Bucket. - */ - rpc createBucket(CreateBucketRequest) - returns(CreateBucketResponse); - - /** - Get Bucket information. - */ - rpc infoBucket(InfoBucketRequest) - returns(InfoBucketResponse); - - /** - Sets bucket properties. - */ - rpc setBucketProperty(SetBucketPropertyRequest) - returns(SetBucketPropertyResponse); - - /** - Get key. - */ - rpc createKey(LocateKeyRequest) - returns(LocateKeyResponse); - - /** - Look up for an existing key. - */ - rpc lookupKey(LocateKeyRequest) - returns(LocateKeyResponse); - - /** - Rename an existing key within a bucket. - */ - rpc renameKey(RenameKeyRequest) - returns(RenameKeyResponse); - - /** - Delete an existing key. - */ - rpc deleteKey(LocateKeyRequest) - returns(LocateKeyResponse); - - /** - Deletes a bucket from volume if it is empty. - */ - rpc deleteBucket(DeleteBucketRequest) - returns (DeleteBucketResponse); - - /** - List Buckets. - */ - rpc listBuckets(ListBucketsRequest) - returns(ListBucketsResponse); - - /** - List Keys. - */ - rpc listKeys(ListKeysRequest) - returns(ListKeysResponse); - - /** - Commit a key. - */ - rpc commitKey(CommitKeyRequest) - returns(CommitKeyResponse); - - /** - Allocate a new block for a key. - */ - rpc allocateBlock(AllocateBlockRequest) - returns(AllocateBlockResponse); - - /** - Returns list of Ozone services with its configuration details. - */ - rpc getServiceList(ServiceListRequest) - returns(ServiceListResponse); - - /** - Creates an S3 bucket and creates an ozone mapping for that bucket. - */ - rpc createS3Bucket(S3BucketRequest) - returns(S3BucketResponse); - - rpc deleteS3Bucket(S3DeleteBucketRequest) - returns(S3DeleteBucketResponse); - - /** - Gets the Ozone Mapping information for the S3Bucket. - */ - rpc getS3Bucketinfo(S3BucketInfoRequest) - returns(S3BucketInfoResponse); - - rpc listS3Buckets(S3ListBucketsRequest) - returns(S3ListBucketsResponse); - - rpc initiateMultiPartUpload(MultipartInfoInitiateRequest) - returns (MultipartInfoInitiateResponse); - - rpc commitMultipartUploadPart(MultipartCommitUploadPartRequest) - returns (MultipartCommitUploadPartResponse); + // A client-to-OM RPC to send client requests to OM Ratis server + rpc submitRequest(OMRequest) + returns(OMResponse); } diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java index f56cbe8223..c786bd0422 100644 --- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java +++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java @@ -37,6 +37,7 @@ import org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainer; import org.apache.hadoop.ozone.web.storage.DistributedStorageHandler; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.ratis.protocol.ClientId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +73,7 @@ public final class ObjectStoreHandler implements Closeable { private final ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient; private final StorageHandler storageHandler; + private ClientId clientId = ClientId.randomId(); /** * Creates a new ObjectStoreHandler. @@ -117,7 +119,7 @@ public ObjectStoreHandler(Configuration conf) throws IOException { RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), - Client.getRpcTimeout(conf))); + Client.getRpcTimeout(conf)), clientId.toString()); storageHandler = new DistributedStorageHandler( new OzoneConfiguration(conf), diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java index 2b41007279..6d9801b8c2 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java @@ -265,6 +265,9 @@ public LifeCycle.State getServerState() { return server.getLifeCycleState(); } + /** + * Get the local directory where ratis logs will be stored. + */ public static String getOMRatisDirectory(Configuration conf) { String storageDir = conf.get(OMConfigKeys.OZONE_OM_RATIS_STORAGE_DIR); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 4490921ab0..5e33dbf799 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -49,6 +49,10 @@ .CreateBucketRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .CreateBucketResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .CreateKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .CreateKeyResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .CreateVolumeRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -57,6 +61,10 @@ .DeleteBucketRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .DeleteBucketResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .DeleteKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .DeleteKeyResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .DeleteVolumeRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -84,9 +92,9 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .ListVolumeResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .LocateKeyRequest; + .LookupKeyRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .LocateKeyResponse; + .LookupKeyResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .MultipartCommitUploadPartRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -95,6 +103,10 @@ .MultipartInfoInitiateRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .MultipartInfoInitiateResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .RenameKeyRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -104,9 +116,9 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .S3BucketInfoResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .S3BucketRequest; + .S3CreateBucketRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .S3BucketResponse; + .S3CreateBucketResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .S3DeleteBucketRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -129,6 +141,8 @@ .SetVolumePropertyResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .Status; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -157,6 +171,154 @@ public OzoneManagerProtocolServerSideTranslatorPB( this.impl = impl; } + /** + * Submit requests to Ratis server for OM HA implmentation. + * TODO: Once HA is implemented fully, we should have only one server side + * translator for OM protocol. + */ + @Override + public OMResponse submitRequest(RpcController controller, + OMRequest request) throws ServiceException { + Type cmdType = request.getCmdType(); + + OMResponse.Builder responseBuilder = OMResponse.newBuilder() + .setCmdType(cmdType); + switch (cmdType) { + case CreateVolume: + CreateVolumeResponse createVolumeResponse = createVolume( + request.getCreateVolumeRequest()); + responseBuilder.setCreateVolumeResponse(createVolumeResponse); + break; + case SetVolumeProperty: + SetVolumePropertyResponse setVolumePropertyResponse = setVolumeProperty( + request.getSetVolumePropertyRequest()); + responseBuilder.setSetVolumePropertyResponse(setVolumePropertyResponse); + break; + case CheckVolumeAccess: + CheckVolumeAccessResponse checkVolumeAccessResponse = checkVolumeAccess( + request.getCheckVolumeAccessRequest()); + responseBuilder.setCheckVolumeAccessResponse(checkVolumeAccessResponse); + break; + case InfoVolume: + InfoVolumeResponse infoVolumeResponse = infoVolume( + request.getInfoVolumeRequest()); + responseBuilder.setInfoVolumeResponse(infoVolumeResponse); + break; + case DeleteVolume: + DeleteVolumeResponse deleteVolumeResponse = deleteVolume( + request.getDeleteVolumeRequest()); + responseBuilder.setDeleteVolumeResponse(deleteVolumeResponse); + break; + case ListVolume: + ListVolumeResponse listVolumeResponse = listVolumes( + request.getListVolumeRequest()); + responseBuilder.setListVolumeResponse(listVolumeResponse); + break; + case CreateBucket: + CreateBucketResponse createBucketResponse = createBucket( + request.getCreateBucketRequest()); + responseBuilder.setCreateBucketResponse(createBucketResponse); + break; + case InfoBucket: + InfoBucketResponse infoBucketResponse = infoBucket( + request.getInfoBucketRequest()); + responseBuilder.setInfoBucketResponse(infoBucketResponse); + break; + case SetBucketProperty: + SetBucketPropertyResponse setBucketPropertyResponse = setBucketProperty( + request.getSetBucketPropertyRequest()); + responseBuilder.setSetBucketPropertyResponse(setBucketPropertyResponse); + break; + case DeleteBucket: + DeleteBucketResponse deleteBucketResponse = deleteBucket( + request.getDeleteBucketRequest()); + responseBuilder.setDeleteBucketResponse(deleteBucketResponse); + break; + case ListBuckets: + ListBucketsResponse listBucketsResponse = listBuckets( + request.getListBucketsRequest()); + responseBuilder.setListBucketsResponse(listBucketsResponse); + break; + case CreateKey: + CreateKeyResponse createKeyResponse = createKey( + request.getCreateKeyRequest()); + responseBuilder.setCreateKeyResponse(createKeyResponse); + break; + case LookupKey: + LookupKeyResponse lookupKeyResponse = lookupKey( + request.getLookupKeyRequest()); + responseBuilder.setLookupKeyResponse(lookupKeyResponse); + break; + case RenameKey: + RenameKeyResponse renameKeyResponse = renameKey( + request.getRenameKeyRequest()); + responseBuilder.setRenameKeyResponse(renameKeyResponse); + break; + case DeleteKey: + DeleteKeyResponse deleteKeyResponse = deleteKey( + request.getDeleteKeyRequest()); + responseBuilder.setDeleteKeyResponse(deleteKeyResponse); + break; + case ListKeys: + ListKeysResponse listKeysResponse = listKeys( + request.getListKeysRequest()); + responseBuilder.setListKeysResponse(listKeysResponse); + break; + case CommitKey: + CommitKeyResponse commitKeyResponse = commitKey( + request.getCommitKeyRequest()); + responseBuilder.setCommitKeyResponse(commitKeyResponse); + break; + case AllocateBlock: + AllocateBlockResponse allocateBlockResponse = allocateBlock( + request.getAllocateBlockRequest()); + responseBuilder.setAllocateBlockResponse(allocateBlockResponse); + break; + case CreateS3Bucket: + S3CreateBucketResponse s3CreateBucketResponse = createS3Bucket( + request.getCreateS3BucketRequest()); + responseBuilder.setCreateS3BucketResponse(s3CreateBucketResponse); + break; + case DeleteS3Bucket: + S3DeleteBucketResponse s3DeleteBucketResponse = deleteS3Bucket( + request.getDeleteS3BucketRequest()); + responseBuilder.setDeleteS3BucketResponse(s3DeleteBucketResponse); + break; + case InfoS3Bucket: + S3BucketInfoResponse s3BucketInfoResponse = getS3Bucketinfo( + request.getInfoS3BucketRequest()); + responseBuilder.setInfoS3BucketResponse(s3BucketInfoResponse); + break; + case ListS3Buckets: + S3ListBucketsResponse s3ListBucketsResponse = listS3Buckets( + request.getListS3BucketsRequest()); + responseBuilder.setListS3BucketsResponse(s3ListBucketsResponse); + break; + case InitiateMultiPartUpload: + MultipartInfoInitiateResponse multipartInfoInitiateResponse = + initiateMultiPartUpload(request.getInitiateMultiPartUploadRequest()); + responseBuilder.setInitiateMultiPartUploadResponse( + multipartInfoInitiateResponse); + break; + case CommitMultiPartUpload: + MultipartCommitUploadPartResponse commitUploadPartResponse = + commitMultipartUploadPart(request.getCommitMultiPartUploadRequest()); + responseBuilder.setCommitMultiPartUploadResponse( + commitUploadPartResponse); + break; + case ServiceList: + ServiceListResponse serviceListResponse = getServiceList( + request.getServiceListRequest()); + responseBuilder.setServiceListResponse(serviceListResponse); + break; + default: + responseBuilder.setSuccess(false); + responseBuilder.setMessage("Unrecognized Command Type: " + cmdType); + break; + } + return responseBuilder.build(); + } + // Convert and exception to corresponding status code private Status exceptionToResponseStatus(IOException ex) { if (ex instanceof OMException) { @@ -218,10 +380,7 @@ private Status exceptionToResponseStatus(IOException ex) { } } - @Override - public CreateVolumeResponse createVolume( - RpcController controller, CreateVolumeRequest request) - throws ServiceException { + private CreateVolumeResponse createVolume(CreateVolumeRequest request) { CreateVolumeResponse.Builder resp = CreateVolumeResponse.newBuilder(); resp.setStatus(Status.OK); try { @@ -232,10 +391,8 @@ public CreateVolumeResponse createVolume( return resp.build(); } - @Override - public SetVolumePropertyResponse setVolumeProperty( - RpcController controller, SetVolumePropertyRequest request) - throws ServiceException { + private SetVolumePropertyResponse setVolumeProperty( + SetVolumePropertyRequest request) { SetVolumePropertyResponse.Builder resp = SetVolumePropertyResponse.newBuilder(); resp.setStatus(Status.OK); @@ -255,10 +412,8 @@ public SetVolumePropertyResponse setVolumeProperty( return resp.build(); } - @Override - public CheckVolumeAccessResponse checkVolumeAccess( - RpcController controller, CheckVolumeAccessRequest request) - throws ServiceException { + private CheckVolumeAccessResponse checkVolumeAccess( + CheckVolumeAccessRequest request) { CheckVolumeAccessResponse.Builder resp = CheckVolumeAccessResponse.newBuilder(); resp.setStatus(Status.OK); @@ -276,10 +431,7 @@ public CheckVolumeAccessResponse checkVolumeAccess( return resp.build(); } - @Override - public InfoVolumeResponse infoVolume( - RpcController controller, InfoVolumeRequest request) - throws ServiceException { + private InfoVolumeResponse infoVolume(InfoVolumeRequest request) { InfoVolumeResponse.Builder resp = InfoVolumeResponse.newBuilder(); resp.setStatus(Status.OK); String volume = request.getVolumeName(); @@ -292,10 +444,7 @@ public InfoVolumeResponse infoVolume( return resp.build(); } - @Override - public DeleteVolumeResponse deleteVolume( - RpcController controller, DeleteVolumeRequest request) - throws ServiceException { + private DeleteVolumeResponse deleteVolume(DeleteVolumeRequest request) { DeleteVolumeResponse.Builder resp = DeleteVolumeResponse.newBuilder(); resp.setStatus(Status.OK); try { @@ -306,9 +455,7 @@ public DeleteVolumeResponse deleteVolume( return resp.build(); } - @Override - public ListVolumeResponse listVolumes( - RpcController controller, ListVolumeRequest request) + private ListVolumeResponse listVolumes(ListVolumeRequest request) throws ServiceException { ListVolumeResponse.Builder resp = ListVolumeResponse.newBuilder(); List result = Lists.newArrayList(); @@ -336,10 +483,7 @@ public ListVolumeResponse listVolumes( return resp.build(); } - @Override - public CreateBucketResponse createBucket( - RpcController controller, CreateBucketRequest - request) throws ServiceException { + private CreateBucketResponse createBucket(CreateBucketRequest request) { CreateBucketResponse.Builder resp = CreateBucketResponse.newBuilder(); try { @@ -352,10 +496,7 @@ public CreateBucketResponse createBucket( return resp.build(); } - @Override - public InfoBucketResponse infoBucket( - RpcController controller, InfoBucketRequest request) - throws ServiceException { + private InfoBucketResponse infoBucket(InfoBucketRequest request) { InfoBucketResponse.Builder resp = InfoBucketResponse.newBuilder(); try { @@ -369,12 +510,9 @@ public InfoBucketResponse infoBucket( return resp.build(); } - @Override - public LocateKeyResponse createKey( - RpcController controller, LocateKeyRequest request - ) throws ServiceException { - LocateKeyResponse.Builder resp = - LocateKeyResponse.newBuilder(); + private CreateKeyResponse createKey(CreateKeyRequest request) { + CreateKeyResponse.Builder resp = + CreateKeyResponse.newBuilder(); try { KeyArgs keyArgs = request.getKeyArgs(); HddsProtos.ReplicationType type = @@ -408,12 +546,9 @@ public LocateKeyResponse createKey( return resp.build(); } - @Override - public LocateKeyResponse lookupKey( - RpcController controller, LocateKeyRequest request - ) throws ServiceException { - LocateKeyResponse.Builder resp = - LocateKeyResponse.newBuilder(); + private LookupKeyResponse lookupKey(LookupKeyRequest request) { + LookupKeyResponse.Builder resp = + LookupKeyResponse.newBuilder(); try { KeyArgs keyArgs = request.getKeyArgs(); OmKeyArgs omKeyArgs = new OmKeyArgs.Builder() @@ -430,10 +565,7 @@ public LocateKeyResponse lookupKey( return resp.build(); } - @Override - public RenameKeyResponse renameKey( - RpcController controller, RenameKeyRequest request) - throws ServiceException { + private RenameKeyResponse renameKey(RenameKeyRequest request) { RenameKeyResponse.Builder resp = RenameKeyResponse.newBuilder(); try { KeyArgs keyArgs = request.getKeyArgs(); @@ -450,10 +582,8 @@ public RenameKeyResponse renameKey( return resp.build(); } - @Override - public SetBucketPropertyResponse setBucketProperty( - RpcController controller, SetBucketPropertyRequest request) - throws ServiceException { + private SetBucketPropertyResponse setBucketProperty( + SetBucketPropertyRequest request) { SetBucketPropertyResponse.Builder resp = SetBucketPropertyResponse.newBuilder(); try { @@ -466,11 +596,9 @@ public SetBucketPropertyResponse setBucketProperty( return resp.build(); } - @Override - public LocateKeyResponse deleteKey(RpcController controller, - LocateKeyRequest request) throws ServiceException { - LocateKeyResponse.Builder resp = - LocateKeyResponse.newBuilder(); + private DeleteKeyResponse deleteKey(DeleteKeyRequest request) { + DeleteKeyResponse.Builder resp = + DeleteKeyResponse.newBuilder(); try { KeyArgs keyArgs = request.getKeyArgs(); OmKeyArgs omKeyArgs = new OmKeyArgs.Builder() @@ -486,10 +614,7 @@ public LocateKeyResponse deleteKey(RpcController controller, return resp.build(); } - @Override - public DeleteBucketResponse deleteBucket( - RpcController controller, DeleteBucketRequest request) - throws ServiceException { + private DeleteBucketResponse deleteBucket(DeleteBucketRequest request) { DeleteBucketResponse.Builder resp = DeleteBucketResponse.newBuilder(); resp.setStatus(Status.OK); try { @@ -500,10 +625,7 @@ public DeleteBucketResponse deleteBucket( return resp.build(); } - @Override - public ListBucketsResponse listBuckets( - RpcController controller, ListBucketsRequest request) - throws ServiceException { + private ListBucketsResponse listBuckets(ListBucketsRequest request) { ListBucketsResponse.Builder resp = ListBucketsResponse.newBuilder(); try { @@ -522,9 +644,7 @@ public ListBucketsResponse listBuckets( return resp.build(); } - @Override - public ListKeysResponse listKeys(RpcController controller, - ListKeysRequest request) throws ServiceException { + private ListKeysResponse listKeys(ListKeysRequest request) { ListKeysResponse.Builder resp = ListKeysResponse.newBuilder(); try { @@ -544,9 +664,7 @@ public ListKeysResponse listKeys(RpcController controller, return resp.build(); } - @Override - public CommitKeyResponse commitKey(RpcController controller, - CommitKeyRequest request) throws ServiceException { + private CommitKeyResponse commitKey(CommitKeyRequest request) { CommitKeyResponse.Builder resp = CommitKeyResponse.newBuilder(); try { @@ -574,9 +692,7 @@ public CommitKeyResponse commitKey(RpcController controller, return resp.build(); } - @Override - public AllocateBlockResponse allocateBlock(RpcController controller, - AllocateBlockRequest request) throws ServiceException { + private AllocateBlockResponse allocateBlock(AllocateBlockRequest request) { AllocateBlockResponse.Builder resp = AllocateBlockResponse.newBuilder(); try { @@ -596,9 +712,7 @@ public AllocateBlockResponse allocateBlock(RpcController controller, return resp.build(); } - @Override - public ServiceListResponse getServiceList(RpcController controller, - ServiceListRequest request) throws ServiceException { + private ServiceListResponse getServiceList(ServiceListRequest request) { ServiceListResponse.Builder resp = ServiceListResponse.newBuilder(); try { resp.addAllServiceInfo(impl.getServiceList().stream() @@ -611,10 +725,8 @@ public ServiceListResponse getServiceList(RpcController controller, return resp.build(); } - @Override - public S3BucketResponse createS3Bucket(RpcController controller, - S3BucketRequest request) throws ServiceException { - S3BucketResponse.Builder resp = S3BucketResponse.newBuilder(); + private S3CreateBucketResponse createS3Bucket(S3CreateBucketRequest request) { + S3CreateBucketResponse.Builder resp = S3CreateBucketResponse.newBuilder(); try { impl.createS3Bucket(request.getUserName(), request.getS3Bucketname()); resp.setStatus(Status.OK); @@ -624,10 +736,7 @@ public S3BucketResponse createS3Bucket(RpcController controller, return resp.build(); } - @Override - public S3DeleteBucketResponse deleteS3Bucket(RpcController controller, - S3DeleteBucketRequest request) throws - ServiceException { + private S3DeleteBucketResponse deleteS3Bucket(S3DeleteBucketRequest request) { S3DeleteBucketResponse.Builder resp = S3DeleteBucketResponse.newBuilder(); try { impl.deleteS3Bucket(request.getS3BucketName()); @@ -638,9 +747,7 @@ public S3DeleteBucketResponse deleteS3Bucket(RpcController controller, return resp.build(); } - @Override - public S3BucketInfoResponse getS3Bucketinfo(RpcController controller, - S3BucketInfoRequest request) throws ServiceException { + private S3BucketInfoResponse getS3Bucketinfo(S3BucketInfoRequest request) { S3BucketInfoResponse.Builder resp = S3BucketInfoResponse.newBuilder(); try { resp.setOzoneMapping( @@ -652,9 +759,7 @@ public S3BucketInfoResponse getS3Bucketinfo(RpcController controller, return resp.build(); } - @Override - public S3ListBucketsResponse listS3Buckets(RpcController controller, - S3ListBucketsRequest request) { + private S3ListBucketsResponse listS3Buckets(S3ListBucketsRequest request) { S3ListBucketsResponse.Builder resp = S3ListBucketsResponse.newBuilder(); try { List buckets = impl.listS3Buckets( @@ -672,9 +777,8 @@ public S3ListBucketsResponse listS3Buckets(RpcController controller, return resp.build(); } - @Override - public MultipartInfoInitiateResponse initiateMultiPartUpload( - RpcController controller, MultipartInfoInitiateRequest request) { + private MultipartInfoInitiateResponse initiateMultiPartUpload( + MultipartInfoInitiateRequest request) { MultipartInfoInitiateResponse.Builder resp = MultipartInfoInitiateResponse .newBuilder(); try { @@ -698,9 +802,8 @@ public MultipartInfoInitiateResponse initiateMultiPartUpload( return resp.build(); } - @Override - public MultipartCommitUploadPartResponse commitMultipartUploadPart( - RpcController controller, MultipartCommitUploadPartRequest request) { + private MultipartCommitUploadPartResponse commitMultipartUploadPart( + MultipartCommitUploadPartRequest request) { MultipartCommitUploadPartResponse.Builder resp = MultipartCommitUploadPartResponse.newBuilder(); try { @@ -722,5 +825,4 @@ public MultipartCommitUploadPartResponse commitMultipartUploadPart( } return resp.build(); } - }