HDDS-1674. Make ScmBlockLocationProtocol message type based.

Closes #984
This commit is contained in:
S O'Donnell 2019-06-21 11:35:32 +02:00 committed by Márton Elek
parent 272b96d243
commit b95a58e231
No known key found for this signature in database
GPG Key ID: D51EA8F00EE79B28
3 changed files with 151 additions and 33 deletions

View File

@ -25,6 +25,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Type;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto;
@ -72,6 +75,16 @@ public ScmBlockLocationProtocolClientSideTranslatorPB(
this.rpcProxy = rpcProxy;
}
/**
* Returns a SCMBlockLocationRequest builder with specified type.
* @param cmdType type of the request
*/
private SCMBlockLocationRequest.Builder createSCMBlockRequest(Type cmdType) {
return SCMBlockLocationRequest.newBuilder()
.setCmdType(cmdType)
.setTraceID(TracingUtil.exportCurrentSpan());
}
/**
* Asks SCM where a block should be allocated. SCM responds with the
* set of datanodes that should be used creating this block.
@ -96,12 +109,19 @@ public List<AllocatedBlock> allocateBlock(long size, int num,
.setType(type)
.setFactor(factor)
.setOwner(owner)
.setTraceID(TracingUtil.exportCurrentSpan())
.setExcludeList(excludeList.getProtoBuf())
.build();
SCMBlockLocationRequest wrapper = createSCMBlockRequest(
Type.AllocateScmBlock)
.setAllocateScmBlockRequest(request)
.build();
final AllocateScmBlockResponseProto response;
final SCMBlockLocationResponse wrappedResponse;
try {
response = rpcProxy.allocateScmBlock(NULL_RPC_CONTROLLER, request);
wrappedResponse = rpcProxy.send(NULL_RPC_CONTROLLER, wrapper);
response = wrappedResponse.getAllocateScmBlockResponse();
} catch (ServiceException e) {
throw transformServiceException(e);
}
@ -141,9 +161,16 @@ public List<DeleteBlockGroupResult> deleteKeyBlocks(
.addAllKeyBlocks(keyBlocksProto)
.build();
SCMBlockLocationRequest wrapper = createSCMBlockRequest(
Type.DeleteScmKeyBlocks)
.setDeleteScmKeyBlocksRequest(request)
.build();
final DeleteScmKeyBlocksResponseProto resp;
final SCMBlockLocationResponse wrappedResponse;
try {
resp = rpcProxy.deleteScmKeyBlocks(NULL_RPC_CONTROLLER, request);
wrappedResponse = rpcProxy.send(NULL_RPC_CONTROLLER, wrapper);
resp = wrappedResponse.getDeleteScmKeyBlocksResponse();
} catch (ServiceException e) {
throw transformServiceException(e);
}
@ -191,8 +218,16 @@ public ScmInfo getScmInfo() throws IOException {
HddsProtos.GetScmInfoRequestProto request =
HddsProtos.GetScmInfoRequestProto.getDefaultInstance();
HddsProtos.GetScmInfoResponseProto resp;
SCMBlockLocationRequest wrapper = createSCMBlockRequest(
Type.GetScmInfo)
.setGetScmInfoRequest(request)
.build();
final SCMBlockLocationResponse wrappedResponse;
try {
resp = rpcProxy.getScmInfo(NULL_RPC_CONTROLLER, request);
wrappedResponse = rpcProxy.send(NULL_RPC_CONTROLLER, wrapper);
resp = wrappedResponse.getGetScmInfoResponse();
} catch (ServiceException e) {
throw transformServiceException(e);
}

View File

@ -22,6 +22,7 @@
import io.opentracing.Scope;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.AllocateBlockResponse;
import org.apache.hadoop.hdds.scm.ScmInfo;
@ -42,6 +43,12 @@
.DeleteScmKeyBlocksRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.DeleteScmKeyBlocksResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.SCMBlockLocationResponse;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.SCMBlockLocationRequest;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.Status;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
@ -71,13 +78,51 @@ public ScmBlockLocationProtocolServerSideTranslatorPB(
this.impl = impl;
}
private SCMBlockLocationResponse.Builder createSCMBlockResponse(
ScmBlockLocationProtocolProtos.Type cmdType,
String traceID) {
return SCMBlockLocationResponse.newBuilder()
.setCmdType(cmdType)
.setTraceID(traceID);
}
@Override
public SCMBlockLocationResponse send(RpcController controller,
SCMBlockLocationRequest request) throws ServiceException {
String traceId = request.getTraceID();
SCMBlockLocationResponse.Builder response = createSCMBlockResponse(
request.getCmdType(),
traceId);
switch (request.getCmdType()) {
case AllocateScmBlock:
response.setAllocateScmBlockResponse(
allocateScmBlock(traceId, request.getAllocateScmBlockRequest()));
break;
case DeleteScmKeyBlocks:
response.setDeleteScmKeyBlocksResponse(
deleteScmKeyBlocks(traceId, request.getDeleteScmKeyBlocksRequest()));
break;
case GetScmInfo:
response.setGetScmInfoResponse(
getScmInfo(traceId, request.getGetScmInfoRequest()));
break;
default:
throw new ServiceException("Unknown Operation");
}
response.setSuccess(true)
.setStatus(Status.OK);
return response.build();
}
public AllocateScmBlockResponseProto allocateScmBlock(
RpcController controller, AllocateScmBlockRequestProto request)
String traceId, AllocateScmBlockRequestProto request)
throws ServiceException {
try (Scope scope = TracingUtil
try(Scope scope = TracingUtil
.importAndCreateScope("ScmBlockLocationProtocol.allocateBlock",
request.getTraceID())) {
traceId)) {
List<AllocatedBlock> allocatedBlocks =
impl.allocateBlock(request.getSize(),
request.getNumBlocks(), request.getType(),
@ -107,13 +152,14 @@ public AllocateScmBlockResponseProto allocateScmBlock(
}
}
@Override
public DeleteScmKeyBlocksResponseProto deleteScmKeyBlocks(
RpcController controller, DeleteScmKeyBlocksRequestProto req)
String traceId, DeleteScmKeyBlocksRequestProto req)
throws ServiceException {
DeleteScmKeyBlocksResponseProto.Builder resp =
DeleteScmKeyBlocksResponseProto.newBuilder();
try {
try(Scope scope = TracingUtil
.importAndCreateScope("ScmBlockLocationProtocol.deleteKeyBlocks",
traceId)) {
List<BlockGroup> infoList = req.getKeyBlocksList().stream()
.map(BlockGroup::getFromProto).collect(Collectors.toList());
final List<DeleteBlockGroupResult> results =
@ -132,12 +178,13 @@ public DeleteScmKeyBlocksResponseProto deleteScmKeyBlocks(
return resp.build();
}
@Override
public HddsProtos.GetScmInfoResponseProto getScmInfo(
RpcController controller, HddsProtos.GetScmInfoRequestProto req)
String traceId, HddsProtos.GetScmInfoRequestProto req)
throws ServiceException {
ScmInfo scmInfo;
try {
try(Scope scope = TracingUtil
.importAndCreateScope("ScmBlockLocationProtocol.getInfo",
traceId)) {
scmInfo = impl.getScmInfo();
} catch (IOException ex) {
throw new ServiceException(ex);

View File

@ -33,6 +33,60 @@ import "hdds.proto";
// SCM Block protocol
enum Type {
AllocateScmBlock = 11;
DeleteScmKeyBlocks = 12;
GetScmInfo = 13;
}
message SCMBlockLocationRequest {
required Type cmdType = 1; // Type of the command
// A string that identifies this command, we generate Trace ID in Ozone
// frontend and this allows us to trace that command all over ozone.
optional string traceID = 2;
optional UserInfo userInfo = 3;
optional AllocateScmBlockRequestProto allocateScmBlockRequest = 11;
optional DeleteScmKeyBlocksRequestProto deleteScmKeyBlocksRequest = 12;
optional hadoop.hdds.GetScmInfoRequestProto getScmInfoRequest = 13;
}
message SCMBlockLocationResponse {
required Type cmdType = 1; // Type of the command
// A string that identifies this command, we generate Trace ID in Ozone
// frontend and this allows us to trace that command all over ozone.
optional string traceID = 2;
optional bool success = 3 [default=true];
optional string message = 4;
required Status status = 5;
optional string leaderOMNodeId = 6;
optional AllocateScmBlockResponseProto allocateScmBlockResponse = 11;
optional DeleteScmKeyBlocksResponseProto deleteScmKeyBlocksResponse = 12;
optional hadoop.hdds.GetScmInfoResponseProto getScmInfoResponse = 13;
}
/**
User information which will be extracted during RPC context and used
during validating Acl.
*/
message UserInfo {
optional string userName = 1;
optional string remoteAddress = 3;
}
enum Status {
OK = 1;
UNKNOWN = 2;
}
/**
* Request send to SCM asking allocate block of specified size.
*/
@ -42,7 +96,6 @@ message AllocateScmBlockRequestProto {
required ReplicationType type = 3;
required hadoop.hdds.ReplicationFactor factor = 4;
required string owner = 5;
optional string traceID = 6;
optional ExcludeListProto excludeList = 7;
}
@ -73,8 +126,6 @@ message KeyBlocks {
*/
message DeleteScmKeyBlocksResponseProto {
repeated DeleteKeyBlocksResultProto results = 1;
optional string traceID = 2;
}
/**
@ -122,21 +173,6 @@ message AllocateScmBlockResponseProto {
*/
service ScmBlockLocationProtocolService {
/**
* Creates a block entry in SCM.
*/
rpc allocateScmBlock(AllocateScmBlockRequestProto)
returns (AllocateScmBlockResponseProto);
/**
* Deletes blocks for a set of object keys from SCM.
*/
rpc deleteScmKeyBlocks(DeleteScmKeyBlocksRequestProto)
returns (DeleteScmKeyBlocksResponseProto);
/**
* Gets the scmInfo from SCM.
*/
rpc getScmInfo(hadoop.hdds.GetScmInfoRequestProto)
returns (hadoop.hdds.GetScmInfoResponseProto);
rpc send(SCMBlockLocationRequest)
returns (SCMBlockLocationResponse);
}