HDDS-1151. Propagate the tracing id in ScmBlockLocationProtocol.
Contributed by Elek, Marton.
This commit is contained in:
parent
ba4e7bd192
commit
9de34d2990
@ -26,6 +26,7 @@
|
|||||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||||
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ -34,7 +35,7 @@
|
|||||||
* to read/write a block.
|
* to read/write a block.
|
||||||
*/
|
*/
|
||||||
@KerberosInfo(serverPrincipal = ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY)
|
@KerberosInfo(serverPrincipal = ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY)
|
||||||
public interface ScmBlockLocationProtocol {
|
public interface ScmBlockLocationProtocol extends Closeable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Asks SCM where a block should be allocated. SCM responds with the
|
* Asks SCM where a block should be allocated. SCM responds with the
|
||||||
|
@ -34,6 +34,7 @@
|
|||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
|
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
|
||||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||||
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
|
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
|
||||||
|
import org.apache.hadoop.hdds.tracing.TracingUtil;
|
||||||
import org.apache.hadoop.ipc.ProtocolTranslator;
|
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||||
@ -83,8 +84,13 @@ public AllocatedBlock allocateBlock(long size,
|
|||||||
Preconditions.checkArgument(size > 0, "block size must be greater than 0");
|
Preconditions.checkArgument(size > 0, "block size must be greater than 0");
|
||||||
|
|
||||||
AllocateScmBlockRequestProto request =
|
AllocateScmBlockRequestProto request =
|
||||||
AllocateScmBlockRequestProto.newBuilder().setSize(size).setType(type)
|
AllocateScmBlockRequestProto.newBuilder()
|
||||||
.setFactor(factor).setOwner(owner).build();
|
.setSize(size)
|
||||||
|
.setType(type)
|
||||||
|
.setFactor(factor)
|
||||||
|
.setOwner(owner)
|
||||||
|
.setTraceID(TracingUtil.exportCurrentSpan())
|
||||||
|
.build();
|
||||||
final AllocateScmBlockResponseProto response;
|
final AllocateScmBlockResponseProto response;
|
||||||
try {
|
try {
|
||||||
response = rpcProxy.allocateScmBlock(NULL_RPC_CONTROLLER, request);
|
response = rpcProxy.allocateScmBlock(NULL_RPC_CONTROLLER, request);
|
||||||
@ -117,7 +123,9 @@ public List<DeleteBlockGroupResult> deleteKeyBlocks(
|
|||||||
List<KeyBlocks> keyBlocksProto = keyBlocksInfoList.stream()
|
List<KeyBlocks> keyBlocksProto = keyBlocksInfoList.stream()
|
||||||
.map(BlockGroup::getProto).collect(Collectors.toList());
|
.map(BlockGroup::getProto).collect(Collectors.toList());
|
||||||
DeleteScmKeyBlocksRequestProto request = DeleteScmKeyBlocksRequestProto
|
DeleteScmKeyBlocksRequestProto request = DeleteScmKeyBlocksRequestProto
|
||||||
.newBuilder().addAllKeyBlocks(keyBlocksProto).build();
|
.newBuilder()
|
||||||
|
.addAllKeyBlocks(keyBlocksProto)
|
||||||
|
.build();
|
||||||
|
|
||||||
final DeleteScmKeyBlocksResponseProto resp;
|
final DeleteScmKeyBlocksResponseProto resp;
|
||||||
try {
|
try {
|
||||||
|
@ -19,6 +19,8 @@
|
|||||||
|
|
||||||
import com.google.protobuf.RpcController;
|
import com.google.protobuf.RpcController;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
|
import io.opentracing.Scope;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdds.scm.ScmInfo;
|
import org.apache.hadoop.hdds.scm.ScmInfo;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
|
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
|
||||||
@ -37,6 +39,7 @@
|
|||||||
.DeleteScmKeyBlocksRequestProto;
|
.DeleteScmKeyBlocksRequestProto;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
|
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
|
||||||
.DeleteScmKeyBlocksResponseProto;
|
.DeleteScmKeyBlocksResponseProto;
|
||||||
|
import org.apache.hadoop.hdds.tracing.TracingUtil;
|
||||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||||
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
||||||
|
|
||||||
@ -69,7 +72,9 @@ public ScmBlockLocationProtocolServerSideTranslatorPB(
|
|||||||
public AllocateScmBlockResponseProto allocateScmBlock(
|
public AllocateScmBlockResponseProto allocateScmBlock(
|
||||||
RpcController controller, AllocateScmBlockRequestProto request)
|
RpcController controller, AllocateScmBlockRequestProto request)
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
try {
|
try (Scope scope = TracingUtil
|
||||||
|
.importAndCreateScope("ScmBlockLocationProtocol.allocateBlock",
|
||||||
|
request.getTraceID())) {
|
||||||
AllocatedBlock allocatedBlock =
|
AllocatedBlock allocatedBlock =
|
||||||
impl.allocateBlock(request.getSize(), request.getType(),
|
impl.allocateBlock(request.getSize(), request.getType(),
|
||||||
request.getFactor(), request.getOwner());
|
request.getFactor(), request.getOwner());
|
||||||
|
@ -41,6 +41,7 @@ message AllocateScmBlockRequestProto {
|
|||||||
required ReplicationType type = 2;
|
required ReplicationType type = 2;
|
||||||
required hadoop.hdds.ReplicationFactor factor = 3;
|
required hadoop.hdds.ReplicationFactor factor = 3;
|
||||||
required string owner = 4;
|
required string owner = 4;
|
||||||
|
optional string traceID = 5;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -71,6 +72,8 @@ message KeyBlocks {
|
|||||||
*/
|
*/
|
||||||
message DeleteScmKeyBlocksResponseProto {
|
message DeleteScmKeyBlocksResponseProto {
|
||||||
repeated DeleteKeyBlocksResultProto results = 1;
|
repeated DeleteKeyBlocksResultProto results = 1;
|
||||||
|
optional string traceID = 2;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -295,4 +295,9 @@ public AuditMessage buildAuditMessageForFailure(AuditAction op, Map<String,
|
|||||||
.withException(throwable)
|
.withException(throwable)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
import com.sun.jersey.api.core.ApplicationAdapter;
|
import com.sun.jersey.api.core.ApplicationAdapter;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
|
||||||
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
|
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
|
||||||
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
|
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
|
||||||
@ -72,7 +73,7 @@ public final class ObjectStoreHandler implements Closeable {
|
|||||||
private final OzoneManagerProtocol ozoneManagerClient;
|
private final OzoneManagerProtocol ozoneManagerClient;
|
||||||
private final StorageContainerLocationProtocol
|
private final StorageContainerLocationProtocol
|
||||||
storageContainerLocationClient;
|
storageContainerLocationClient;
|
||||||
private final ScmBlockLocationProtocolClientSideTranslatorPB
|
private final ScmBlockLocationProtocol
|
||||||
scmBlockLocationClient;
|
scmBlockLocationClient;
|
||||||
private final StorageHandler storageHandler;
|
private final StorageHandler storageHandler;
|
||||||
private ClientId clientId = ClientId.randomId();
|
private ClientId clientId = ClientId.randomId();
|
||||||
@ -108,11 +109,13 @@ public ObjectStoreHandler(Configuration conf) throws IOException {
|
|||||||
InetSocketAddress scmBlockAddress =
|
InetSocketAddress scmBlockAddress =
|
||||||
getScmAddressForBlockClients(conf);
|
getScmAddressForBlockClients(conf);
|
||||||
this.scmBlockLocationClient =
|
this.scmBlockLocationClient =
|
||||||
new ScmBlockLocationProtocolClientSideTranslatorPB(
|
TracingUtil.createProxy(
|
||||||
RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion,
|
new ScmBlockLocationProtocolClientSideTranslatorPB(
|
||||||
scmBlockAddress, UserGroupInformation.getCurrentUser(), conf,
|
RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion,
|
||||||
NetUtils.getDefaultSocketFactory(conf),
|
scmBlockAddress, UserGroupInformation.getCurrentUser(),
|
||||||
Client.getRpcTimeout(conf)));
|
conf, NetUtils.getDefaultSocketFactory(conf),
|
||||||
|
Client.getRpcTimeout(conf))),
|
||||||
|
ScmBlockLocationProtocol.class);
|
||||||
|
|
||||||
RPC.setProtocolEngine(conf, OzoneManagerProtocolPB.class,
|
RPC.setProtocolEngine(conf, OzoneManagerProtocolPB.class,
|
||||||
ProtobufRpcEngine.class);
|
ProtobufRpcEngine.class);
|
||||||
|
@ -724,7 +724,8 @@ private static ScmBlockLocationProtocol getScmBlockClient(
|
|||||||
scmBlockAddress, UserGroupInformation.getCurrentUser(), conf,
|
scmBlockAddress, UserGroupInformation.getCurrentUser(), conf,
|
||||||
NetUtils.getDefaultSocketFactory(conf),
|
NetUtils.getDefaultSocketFactory(conf),
|
||||||
Client.getRpcTimeout(conf)));
|
Client.getRpcTimeout(conf)));
|
||||||
return scmBlockLocationClient;
|
return TracingUtil
|
||||||
|
.createProxy(scmBlockLocationClient, ScmBlockLocationProtocol.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -178,4 +178,9 @@ public ScmInfo getScmInfo() throws IOException {
|
|||||||
.setScmId(scmId);
|
.setScmId(scmId);
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user