From b95a58e231cb1ac87680a072d7fa84998fd95921 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Fri, 21 Jun 2019 11:35:32 +0200 Subject: [PATCH] HDDS-1674. Make ScmBlockLocationProtocol message type based. Closes #984 --- ...ocationProtocolClientSideTranslatorPB.java | 43 ++++++++++- ...ocationProtocolServerSideTranslatorPB.java | 65 +++++++++++++--- .../main/proto/ScmBlockLocationProtocol.proto | 76 ++++++++++++++----- 3 files changed, 151 insertions(+), 33 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java index 559022f286..aadf585775 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java @@ -25,6 +25,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Type; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockRequestProto; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto; @@ -72,6 +75,16 @@ public ScmBlockLocationProtocolClientSideTranslatorPB( this.rpcProxy = rpcProxy; } + /** + * Returns a SCMBlockLocationRequest builder with specified type. + * @param cmdType type of the request + */ + private SCMBlockLocationRequest.Builder createSCMBlockRequest(Type cmdType) { + return SCMBlockLocationRequest.newBuilder() + .setCmdType(cmdType) + .setTraceID(TracingUtil.exportCurrentSpan()); + } + /** * Asks SCM where a block should be allocated. SCM responds with the * set of datanodes that should be used creating this block. @@ -96,12 +109,19 @@ public List allocateBlock(long size, int num, .setType(type) .setFactor(factor) .setOwner(owner) - .setTraceID(TracingUtil.exportCurrentSpan()) .setExcludeList(excludeList.getProtoBuf()) .build(); + + SCMBlockLocationRequest wrapper = createSCMBlockRequest( + Type.AllocateScmBlock) + .setAllocateScmBlockRequest(request) + .build(); + final AllocateScmBlockResponseProto response; + final SCMBlockLocationResponse wrappedResponse; try { - response = rpcProxy.allocateScmBlock(NULL_RPC_CONTROLLER, request); + wrappedResponse = rpcProxy.send(NULL_RPC_CONTROLLER, wrapper); + response = wrappedResponse.getAllocateScmBlockResponse(); } catch (ServiceException e) { throw transformServiceException(e); } @@ -141,9 +161,16 @@ public List deleteKeyBlocks( .addAllKeyBlocks(keyBlocksProto) .build(); + SCMBlockLocationRequest wrapper = createSCMBlockRequest( + Type.DeleteScmKeyBlocks) + .setDeleteScmKeyBlocksRequest(request) + .build(); + final DeleteScmKeyBlocksResponseProto resp; + final SCMBlockLocationResponse wrappedResponse; try { - resp = rpcProxy.deleteScmKeyBlocks(NULL_RPC_CONTROLLER, request); + wrappedResponse = rpcProxy.send(NULL_RPC_CONTROLLER, wrapper); + resp = wrappedResponse.getDeleteScmKeyBlocksResponse(); } catch (ServiceException e) { throw transformServiceException(e); } @@ -191,8 +218,16 @@ public ScmInfo getScmInfo() throws IOException { HddsProtos.GetScmInfoRequestProto request = HddsProtos.GetScmInfoRequestProto.getDefaultInstance(); HddsProtos.GetScmInfoResponseProto resp; + + SCMBlockLocationRequest wrapper = createSCMBlockRequest( + Type.GetScmInfo) + .setGetScmInfoRequest(request) + .build(); + + final SCMBlockLocationResponse wrappedResponse; try { - resp = rpcProxy.getScmInfo(NULL_RPC_CONTROLLER, request); + wrappedResponse = rpcProxy.send(NULL_RPC_CONTROLLER, wrapper); + resp = wrappedResponse.getGetScmInfoResponse(); } catch (ServiceException e) { throw transformServiceException(e); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java index 65f0a973ce..db1240a5f6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java @@ -22,6 +22,7 @@ import io.opentracing.Scope; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos .AllocateBlockResponse; import org.apache.hadoop.hdds.scm.ScmInfo; @@ -42,6 +43,12 @@ .DeleteScmKeyBlocksRequestProto; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos .DeleteScmKeyBlocksResponseProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .SCMBlockLocationResponse; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .SCMBlockLocationRequest; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .Status; import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; @@ -71,13 +78,51 @@ public ScmBlockLocationProtocolServerSideTranslatorPB( this.impl = impl; } + + private SCMBlockLocationResponse.Builder createSCMBlockResponse( + ScmBlockLocationProtocolProtos.Type cmdType, + String traceID) { + return SCMBlockLocationResponse.newBuilder() + .setCmdType(cmdType) + .setTraceID(traceID); + } + @Override + public SCMBlockLocationResponse send(RpcController controller, + SCMBlockLocationRequest request) throws ServiceException { + String traceId = request.getTraceID(); + + SCMBlockLocationResponse.Builder response = createSCMBlockResponse( + request.getCmdType(), + traceId); + + switch (request.getCmdType()) { + case AllocateScmBlock: + response.setAllocateScmBlockResponse( + allocateScmBlock(traceId, request.getAllocateScmBlockRequest())); + break; + case DeleteScmKeyBlocks: + response.setDeleteScmKeyBlocksResponse( + deleteScmKeyBlocks(traceId, request.getDeleteScmKeyBlocksRequest())); + break; + case GetScmInfo: + response.setGetScmInfoResponse( + getScmInfo(traceId, request.getGetScmInfoRequest())); + break; + default: + throw new ServiceException("Unknown Operation"); + } + response.setSuccess(true) + .setStatus(Status.OK); + return response.build(); + } + public AllocateScmBlockResponseProto allocateScmBlock( - RpcController controller, AllocateScmBlockRequestProto request) + String traceId, AllocateScmBlockRequestProto request) throws ServiceException { - try (Scope scope = TracingUtil + try(Scope scope = TracingUtil .importAndCreateScope("ScmBlockLocationProtocol.allocateBlock", - request.getTraceID())) { + traceId)) { List allocatedBlocks = impl.allocateBlock(request.getSize(), request.getNumBlocks(), request.getType(), @@ -107,13 +152,14 @@ public AllocateScmBlockResponseProto allocateScmBlock( } } - @Override public DeleteScmKeyBlocksResponseProto deleteScmKeyBlocks( - RpcController controller, DeleteScmKeyBlocksRequestProto req) + String traceId, DeleteScmKeyBlocksRequestProto req) throws ServiceException { DeleteScmKeyBlocksResponseProto.Builder resp = DeleteScmKeyBlocksResponseProto.newBuilder(); - try { + try(Scope scope = TracingUtil + .importAndCreateScope("ScmBlockLocationProtocol.deleteKeyBlocks", + traceId)) { List infoList = req.getKeyBlocksList().stream() .map(BlockGroup::getFromProto).collect(Collectors.toList()); final List results = @@ -132,12 +178,13 @@ public DeleteScmKeyBlocksResponseProto deleteScmKeyBlocks( return resp.build(); } - @Override public HddsProtos.GetScmInfoResponseProto getScmInfo( - RpcController controller, HddsProtos.GetScmInfoRequestProto req) + String traceId, HddsProtos.GetScmInfoRequestProto req) throws ServiceException { ScmInfo scmInfo; - try { + try(Scope scope = TracingUtil + .importAndCreateScope("ScmBlockLocationProtocol.getInfo", + traceId)) { scmInfo = impl.getScmInfo(); } catch (IOException ex) { throw new ServiceException(ex); diff --git a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto index 6745c6ee14..8222d8b45b 100644 --- a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto @@ -33,6 +33,60 @@ import "hdds.proto"; // SCM Block protocol +enum Type { + AllocateScmBlock = 11; + DeleteScmKeyBlocks = 12; + GetScmInfo = 13; +} + +message SCMBlockLocationRequest { + required Type cmdType = 1; // Type of the command + + // A string that identifies this command, we generate Trace ID in Ozone + // frontend and this allows us to trace that command all over ozone. + optional string traceID = 2; + + optional UserInfo userInfo = 3; + + optional AllocateScmBlockRequestProto allocateScmBlockRequest = 11; + optional DeleteScmKeyBlocksRequestProto deleteScmKeyBlocksRequest = 12; + optional hadoop.hdds.GetScmInfoRequestProto getScmInfoRequest = 13; +} + +message SCMBlockLocationResponse { + required Type cmdType = 1; // Type of the command + + // A string that identifies this command, we generate Trace ID in Ozone + // frontend and this allows us to trace that command all over ozone. + optional string traceID = 2; + + optional bool success = 3 [default=true]; + + optional string message = 4; + + required Status status = 5; + + optional string leaderOMNodeId = 6; + + optional AllocateScmBlockResponseProto allocateScmBlockResponse = 11; + optional DeleteScmKeyBlocksResponseProto deleteScmKeyBlocksResponse = 12; + optional hadoop.hdds.GetScmInfoResponseProto getScmInfoResponse = 13; +} + +/** + User information which will be extracted during RPC context and used + during validating Acl. +*/ +message UserInfo { + optional string userName = 1; + optional string remoteAddress = 3; +} + +enum Status { + OK = 1; + UNKNOWN = 2; +} + /** * Request send to SCM asking allocate block of specified size. */ @@ -42,7 +96,6 @@ message AllocateScmBlockRequestProto { required ReplicationType type = 3; required hadoop.hdds.ReplicationFactor factor = 4; required string owner = 5; - optional string traceID = 6; optional ExcludeListProto excludeList = 7; } @@ -73,8 +126,6 @@ message KeyBlocks { */ message DeleteScmKeyBlocksResponseProto { repeated DeleteKeyBlocksResultProto results = 1; - optional string traceID = 2; - } /** @@ -122,21 +173,6 @@ message AllocateScmBlockResponseProto { */ service ScmBlockLocationProtocolService { - /** - * Creates a block entry in SCM. - */ - rpc allocateScmBlock(AllocateScmBlockRequestProto) - returns (AllocateScmBlockResponseProto); - - /** - * Deletes blocks for a set of object keys from SCM. - */ - rpc deleteScmKeyBlocks(DeleteScmKeyBlocksRequestProto) - returns (DeleteScmKeyBlocksResponseProto); - - /** - * Gets the scmInfo from SCM. - */ - rpc getScmInfo(hadoop.hdds.GetScmInfoRequestProto) - returns (hadoop.hdds.GetScmInfoResponseProto); + rpc send(SCMBlockLocationRequest) + returns (SCMBlockLocationResponse); }