HDFS-6880. Adding tracing to DataNode data transfer protocol (iwasakims via cmccabe)
This commit is contained in:
parent
8e5d6713cf
commit
56119fec96
@ -461,6 +461,9 @@ Release 2.6.0 - UNRELEASED
|
||||
HDFS-7059. HAadmin transtionToActive with forceActive option can show
|
||||
confusing message.
|
||||
|
||||
HDFS-6880. Adding tracing to DataNode data transfer protocol. (iwasakims
|
||||
via cmccabe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
||||
|
@ -88,6 +88,10 @@
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import org.htrace.Span;
|
||||
import org.htrace.Trace;
|
||||
import org.htrace.TraceScope;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.CacheLoader;
|
||||
@ -355,12 +359,22 @@ public DatanodeInfo load(DatanodeInfo key) throws Exception {
|
||||
/** Append on an existing block? */
|
||||
private final boolean isAppend;
|
||||
|
||||
private final Span traceSpan;
|
||||
|
||||
/**
|
||||
* Default construction for file create
|
||||
*/
|
||||
private DataStreamer() {
|
||||
this(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* construction with tracing info
|
||||
*/
|
||||
private DataStreamer(Span span) {
|
||||
isAppend = false;
|
||||
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
||||
traceSpan = span;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -371,9 +385,10 @@ private DataStreamer() {
|
||||
* @throws IOException if error occurs
|
||||
*/
|
||||
private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
|
||||
int bytesPerChecksum) throws IOException {
|
||||
int bytesPerChecksum, Span span) throws IOException {
|
||||
isAppend = true;
|
||||
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
|
||||
traceSpan = span;
|
||||
block = lastBlock.getBlock();
|
||||
bytesSent = block.getNumBytes();
|
||||
accessToken = lastBlock.getBlockToken();
|
||||
@ -463,6 +478,10 @@ private void endBlock() {
|
||||
@Override
|
||||
public void run() {
|
||||
long lastPacket = Time.now();
|
||||
TraceScope traceScope = null;
|
||||
if (traceSpan != null) {
|
||||
traceScope = Trace.continueSpan(traceSpan);
|
||||
}
|
||||
while (!streamerClosed && dfsClient.clientRunning) {
|
||||
|
||||
// if the Responder encountered an error, shutdown Responder
|
||||
@ -636,6 +655,9 @@ public void run() {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (traceScope != null) {
|
||||
traceScope.close();
|
||||
}
|
||||
closeInternal();
|
||||
}
|
||||
|
||||
@ -1611,7 +1633,11 @@ private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
|
||||
computePacketChunkSize(dfsClient.getConf().writePacketSize,
|
||||
checksum.getBytesPerChecksum());
|
||||
|
||||
streamer = new DataStreamer();
|
||||
Span traceSpan = null;
|
||||
if (Trace.isTracing()) {
|
||||
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
|
||||
}
|
||||
streamer = new DataStreamer(traceSpan);
|
||||
if (favoredNodes != null && favoredNodes.length != 0) {
|
||||
streamer.setFavoredNodes(favoredNodes);
|
||||
}
|
||||
@ -1652,15 +1678,21 @@ private DFSOutputStream(DFSClient dfsClient, String src,
|
||||
this(dfsClient, src, progress, stat, checksum);
|
||||
initialFileSize = stat.getLen(); // length of file when opened
|
||||
|
||||
Span traceSpan = null;
|
||||
if (Trace.isTracing()) {
|
||||
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
|
||||
}
|
||||
|
||||
// The last partial block of the file has to be filled.
|
||||
if (lastBlock != null) {
|
||||
// indicate that we are appending to an existing block
|
||||
bytesCurBlock = lastBlock.getBlockSize();
|
||||
streamer = new DataStreamer(lastBlock, stat, checksum.getBytesPerChecksum());
|
||||
streamer = new DataStreamer(lastBlock, stat,
|
||||
checksum.getBytesPerChecksum(), traceSpan);
|
||||
} else {
|
||||
computePacketChunkSize(dfsClient.getConf().writePacketSize,
|
||||
checksum.getBytesPerChecksum());
|
||||
streamer = new DataStreamer();
|
||||
streamer = new DataStreamer(traceSpan);
|
||||
}
|
||||
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
|
||||
}
|
||||
|
@ -25,12 +25,16 @@
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
import org.htrace.Span;
|
||||
import org.htrace.Trace;
|
||||
import org.htrace.TraceInfo;
|
||||
import org.htrace.TraceScope;
|
||||
|
||||
/**
|
||||
* Static utilities for dealing with the protocol buffers used by the
|
||||
@ -78,9 +82,41 @@ static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk,
|
||||
|
||||
static BaseHeaderProto buildBaseHeader(ExtendedBlock blk,
|
||||
Token<BlockTokenIdentifier> blockToken) {
|
||||
return BaseHeaderProto.newBuilder()
|
||||
BaseHeaderProto.Builder builder = BaseHeaderProto.newBuilder()
|
||||
.setBlock(PBHelper.convert(blk))
|
||||
.setToken(PBHelper.convert(blockToken))
|
||||
.build();
|
||||
.setToken(PBHelper.convert(blockToken));
|
||||
if (Trace.isTracing()) {
|
||||
Span s = Trace.currentSpan();
|
||||
builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
|
||||
.setTraceId(s.getTraceId())
|
||||
.setParentId(s.getSpanId()));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public static TraceInfo fromProto(DataTransferTraceInfoProto proto) {
|
||||
if (proto == null) return null;
|
||||
if (!proto.hasTraceId()) return null;
|
||||
return new TraceInfo(proto.getTraceId(), proto.getParentId());
|
||||
}
|
||||
|
||||
public static TraceScope continueTraceSpan(ClientOperationHeaderProto header,
|
||||
String description) {
|
||||
return continueTraceSpan(header.getBaseHeader(), description);
|
||||
}
|
||||
|
||||
public static TraceScope continueTraceSpan(BaseHeaderProto header,
|
||||
String description) {
|
||||
return continueTraceSpan(header.getTraceInfo(), description);
|
||||
}
|
||||
|
||||
public static TraceScope continueTraceSpan(DataTransferTraceInfoProto proto,
|
||||
String description) {
|
||||
TraceScope scope = null;
|
||||
TraceInfo info = fromProto(proto);
|
||||
if (info != null) {
|
||||
scope = Trace.startSpan(description, info);
|
||||
}
|
||||
return scope;
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,7 @@
|
||||
package org.apache.hadoop.hdfs.protocol.datatransfer;
|
||||
|
||||
import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto;
|
||||
import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.continueTraceSpan;
|
||||
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
@ -39,6 +40,7 @@
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
|
||||
import org.htrace.TraceScope;
|
||||
|
||||
/** Receiver */
|
||||
@InterfaceAudience.Private
|
||||
@ -108,7 +110,10 @@ static private CachingStrategy getCachingStrategy(CachingStrategyProto strategy)
|
||||
/** Receive OP_READ_BLOCK */
|
||||
private void opReadBlock() throws IOException {
|
||||
OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
|
||||
readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
|
||||
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
|
||||
proto.getClass().getSimpleName());
|
||||
try {
|
||||
readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
|
||||
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
|
||||
proto.getHeader().getClientName(),
|
||||
proto.getOffset(),
|
||||
@ -117,27 +122,36 @@ private void opReadBlock() throws IOException {
|
||||
(proto.hasCachingStrategy() ?
|
||||
getCachingStrategy(proto.getCachingStrategy()) :
|
||||
CachingStrategy.newDefaultStrategy()));
|
||||
} finally {
|
||||
if (traceScope != null) traceScope.close();
|
||||
}
|
||||
}
|
||||
|
||||
/** Receive OP_WRITE_BLOCK */
|
||||
private void opWriteBlock(DataInputStream in) throws IOException {
|
||||
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
|
||||
final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
|
||||
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
|
||||
PBHelper.convertStorageType(proto.getStorageType()),
|
||||
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
|
||||
proto.getHeader().getClientName(),
|
||||
targets,
|
||||
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
|
||||
PBHelper.convert(proto.getSource()),
|
||||
fromProto(proto.getStage()),
|
||||
proto.getPipelineSize(),
|
||||
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
|
||||
proto.getLatestGenerationStamp(),
|
||||
fromProto(proto.getRequestedChecksum()),
|
||||
(proto.hasCachingStrategy() ?
|
||||
getCachingStrategy(proto.getCachingStrategy()) :
|
||||
CachingStrategy.newDefaultStrategy()));
|
||||
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
|
||||
proto.getClass().getSimpleName());
|
||||
try {
|
||||
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
|
||||
PBHelper.convertStorageType(proto.getStorageType()),
|
||||
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
|
||||
proto.getHeader().getClientName(),
|
||||
targets,
|
||||
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
|
||||
PBHelper.convert(proto.getSource()),
|
||||
fromProto(proto.getStage()),
|
||||
proto.getPipelineSize(),
|
||||
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
|
||||
proto.getLatestGenerationStamp(),
|
||||
fromProto(proto.getRequestedChecksum()),
|
||||
(proto.hasCachingStrategy() ?
|
||||
getCachingStrategy(proto.getCachingStrategy()) :
|
||||
CachingStrategy.newDefaultStrategy()));
|
||||
} finally {
|
||||
if (traceScope != null) traceScope.close();
|
||||
}
|
||||
}
|
||||
|
||||
/** Receive {@link Op#TRANSFER_BLOCK} */
|
||||
@ -145,11 +159,17 @@ private void opTransferBlock(DataInputStream in) throws IOException {
|
||||
final OpTransferBlockProto proto =
|
||||
OpTransferBlockProto.parseFrom(vintPrefixed(in));
|
||||
final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
|
||||
transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
|
||||
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
|
||||
proto.getHeader().getClientName(),
|
||||
targets,
|
||||
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length));
|
||||
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
|
||||
proto.getClass().getSimpleName());
|
||||
try {
|
||||
transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
|
||||
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
|
||||
proto.getHeader().getClientName(),
|
||||
targets,
|
||||
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length));
|
||||
} finally {
|
||||
if (traceScope != null) traceScope.close();
|
||||
}
|
||||
}
|
||||
|
||||
/** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */
|
||||
@ -158,9 +178,15 @@ private void opRequestShortCircuitFds(DataInputStream in) throws IOException {
|
||||
OpRequestShortCircuitAccessProto.parseFrom(vintPrefixed(in));
|
||||
SlotId slotId = (proto.hasSlotId()) ?
|
||||
PBHelper.convert(proto.getSlotId()) : null;
|
||||
requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
|
||||
PBHelper.convert(proto.getHeader().getToken()),
|
||||
slotId, proto.getMaxVersion());
|
||||
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
|
||||
proto.getClass().getSimpleName());
|
||||
try {
|
||||
requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
|
||||
PBHelper.convert(proto.getHeader().getToken()),
|
||||
slotId, proto.getMaxVersion());
|
||||
} finally {
|
||||
if (traceScope != null) traceScope.close();
|
||||
}
|
||||
}
|
||||
|
||||
/** Receive {@link Op#RELEASE_SHORT_CIRCUIT_FDS} */
|
||||
@ -168,38 +194,67 @@ private void opReleaseShortCircuitFds(DataInputStream in)
|
||||
throws IOException {
|
||||
final ReleaseShortCircuitAccessRequestProto proto =
|
||||
ReleaseShortCircuitAccessRequestProto.parseFrom(vintPrefixed(in));
|
||||
releaseShortCircuitFds(PBHelper.convert(proto.getSlotId()));
|
||||
TraceScope traceScope = continueTraceSpan(proto.getTraceInfo(),
|
||||
proto.getClass().getSimpleName());
|
||||
try {
|
||||
releaseShortCircuitFds(PBHelper.convert(proto.getSlotId()));
|
||||
} finally {
|
||||
if (traceScope != null) traceScope.close();
|
||||
}
|
||||
}
|
||||
|
||||
/** Receive {@link Op#REQUEST_SHORT_CIRCUIT_SHM} */
|
||||
private void opRequestShortCircuitShm(DataInputStream in) throws IOException {
|
||||
final ShortCircuitShmRequestProto proto =
|
||||
ShortCircuitShmRequestProto.parseFrom(vintPrefixed(in));
|
||||
requestShortCircuitShm(proto.getClientName());
|
||||
TraceScope traceScope = continueTraceSpan(proto.getTraceInfo(),
|
||||
proto.getClass().getSimpleName());
|
||||
try {
|
||||
requestShortCircuitShm(proto.getClientName());
|
||||
} finally {
|
||||
if (traceScope != null) traceScope.close();
|
||||
}
|
||||
}
|
||||
|
||||
/** Receive OP_REPLACE_BLOCK */
|
||||
private void opReplaceBlock(DataInputStream in) throws IOException {
|
||||
OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
|
||||
replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
|
||||
PBHelper.convertStorageType(proto.getStorageType()),
|
||||
PBHelper.convert(proto.getHeader().getToken()),
|
||||
proto.getDelHint(),
|
||||
PBHelper.convert(proto.getSource()));
|
||||
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
|
||||
proto.getClass().getSimpleName());
|
||||
try {
|
||||
replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
|
||||
PBHelper.convertStorageType(proto.getStorageType()),
|
||||
PBHelper.convert(proto.getHeader().getToken()),
|
||||
proto.getDelHint(),
|
||||
PBHelper.convert(proto.getSource()));
|
||||
} finally {
|
||||
if (traceScope != null) traceScope.close();
|
||||
}
|
||||
}
|
||||
|
||||
/** Receive OP_COPY_BLOCK */
|
||||
private void opCopyBlock(DataInputStream in) throws IOException {
|
||||
OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in));
|
||||
copyBlock(PBHelper.convert(proto.getHeader().getBlock()),
|
||||
PBHelper.convert(proto.getHeader().getToken()));
|
||||
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
|
||||
proto.getClass().getSimpleName());
|
||||
try {
|
||||
copyBlock(PBHelper.convert(proto.getHeader().getBlock()),
|
||||
PBHelper.convert(proto.getHeader().getToken()));
|
||||
} finally {
|
||||
if (traceScope != null) traceScope.close();
|
||||
}
|
||||
}
|
||||
|
||||
/** Receive OP_BLOCK_CHECKSUM */
|
||||
private void opBlockChecksum(DataInputStream in) throws IOException {
|
||||
OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in));
|
||||
|
||||
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
|
||||
proto.getClass().getSimpleName());
|
||||
try {
|
||||
blockChecksum(PBHelper.convert(proto.getHeader().getBlock()),
|
||||
PBHelper.convert(proto.getHeader().getToken()));
|
||||
} finally {
|
||||
if (traceScope != null) traceScope.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -31,6 +31,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
|
||||
@ -47,6 +48,9 @@
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
import org.htrace.Trace;
|
||||
import org.htrace.Span;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
|
||||
/** Sender */
|
||||
@ -185,19 +189,29 @@ public void requestShortCircuitFds(final ExtendedBlock blk,
|
||||
|
||||
@Override
|
||||
public void releaseShortCircuitFds(SlotId slotId) throws IOException {
|
||||
ReleaseShortCircuitAccessRequestProto proto =
|
||||
ReleaseShortCircuitAccessRequestProto.Builder builder =
|
||||
ReleaseShortCircuitAccessRequestProto.newBuilder().
|
||||
setSlotId(PBHelper.convert(slotId)).
|
||||
build();
|
||||
setSlotId(PBHelper.convert(slotId));
|
||||
if (Trace.isTracing()) {
|
||||
Span s = Trace.currentSpan();
|
||||
builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
|
||||
.setTraceId(s.getTraceId()).setParentId(s.getSpanId()));
|
||||
}
|
||||
ReleaseShortCircuitAccessRequestProto proto = builder.build();
|
||||
send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void requestShortCircuitShm(String clientName) throws IOException {
|
||||
ShortCircuitShmRequestProto proto =
|
||||
ShortCircuitShmRequestProto.Builder builder =
|
||||
ShortCircuitShmRequestProto.newBuilder().
|
||||
setClientName(clientName).
|
||||
build();
|
||||
setClientName(clientName);
|
||||
if (Trace.isTracing()) {
|
||||
Span s = Trace.currentSpan();
|
||||
builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
|
||||
.setTraceId(s.getTraceId()).setParentId(s.getSpanId()));
|
||||
}
|
||||
ShortCircuitShmRequestProto proto = builder.build();
|
||||
send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto);
|
||||
}
|
||||
|
||||
|
@ -47,6 +47,12 @@ message DataTransferEncryptorMessageProto {
|
||||
message BaseHeaderProto {
|
||||
required ExtendedBlockProto block = 1;
|
||||
optional hadoop.common.TokenProto token = 2;
|
||||
optional DataTransferTraceInfoProto traceInfo = 3;
|
||||
}
|
||||
|
||||
message DataTransferTraceInfoProto {
|
||||
required uint64 traceId = 1;
|
||||
required uint64 parentId = 2;
|
||||
}
|
||||
|
||||
message ClientOperationHeaderProto {
|
||||
@ -166,6 +172,7 @@ message OpRequestShortCircuitAccessProto {
|
||||
|
||||
message ReleaseShortCircuitAccessRequestProto {
|
||||
required ShortCircuitShmSlotProto slotId = 1;
|
||||
optional DataTransferTraceInfoProto traceInfo = 2;
|
||||
}
|
||||
|
||||
message ReleaseShortCircuitAccessResponseProto {
|
||||
@ -177,6 +184,7 @@ message ShortCircuitShmRequestProto {
|
||||
// The name of the client requesting the shared memory segment. This is
|
||||
// purely for logging / debugging purposes.
|
||||
required string clientName = 1;
|
||||
optional DataTransferTraceInfoProto traceInfo = 2;
|
||||
}
|
||||
|
||||
message ShortCircuitShmResponseProto {
|
||||
|
@ -24,6 +24,7 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.htrace.HTraceConfiguration;
|
||||
import org.htrace.Sampler;
|
||||
import org.htrace.Span;
|
||||
@ -39,11 +40,13 @@
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
public class TestTracing {
|
||||
|
||||
@ -81,7 +84,12 @@ public void testWriteTraceHooks() throws Exception {
|
||||
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.create",
|
||||
"org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync",
|
||||
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.fsync",
|
||||
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.complete"
|
||||
"org.apache.hadoop.hdfs.protocol.ClientProtocol.complete",
|
||||
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.complete",
|
||||
"DFSOutputStream",
|
||||
"OpWriteBlockProto",
|
||||
"org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock",
|
||||
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.addBlock"
|
||||
};
|
||||
assertSpanNamesFound(expectedSpanNames);
|
||||
|
||||
@ -96,7 +104,7 @@ public void testWriteTraceHooks() throws Exception {
|
||||
|
||||
// There should only be one trace id as it should all be homed in the
|
||||
// top trace.
|
||||
for (Span span : SetSpanReceiver.SetHolder.spans) {
|
||||
for (Span span : SetSpanReceiver.SetHolder.spans.values()) {
|
||||
Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId());
|
||||
}
|
||||
}
|
||||
@ -152,7 +160,8 @@ public void testReadTraceHooks() throws Exception {
|
||||
String[] expectedSpanNames = {
|
||||
"testReadTraceHooks",
|
||||
"org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations",
|
||||
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.getBlockLocations"
|
||||
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.getBlockLocations",
|
||||
"OpReadBlockProto"
|
||||
};
|
||||
assertSpanNamesFound(expectedSpanNames);
|
||||
|
||||
@ -168,7 +177,7 @@ public void testReadTraceHooks() throws Exception {
|
||||
|
||||
// There should only be one trace id as it should all be homed in the
|
||||
// top trace.
|
||||
for (Span span : SetSpanReceiver.SetHolder.spans) {
|
||||
for (Span span : SetSpanReceiver.SetHolder.spans.values()) {
|
||||
Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId());
|
||||
}
|
||||
}
|
||||
@ -228,10 +237,24 @@ public static void shutDown() throws IOException {
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
private void assertSpanNamesFound(String[] expectedSpanNames) {
|
||||
Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap();
|
||||
for (String spanName : expectedSpanNames) {
|
||||
Assert.assertTrue("Should find a span with name " + spanName, map.get(spanName) != null);
|
||||
static void assertSpanNamesFound(final String[] expectedSpanNames) {
|
||||
try {
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap();
|
||||
for (String spanName : expectedSpanNames) {
|
||||
if (!map.containsKey(spanName)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}, 100, 1000);
|
||||
} catch (TimeoutException e) {
|
||||
Assert.fail("timed out to get expected spans: " + e.getMessage());
|
||||
} catch (InterruptedException e) {
|
||||
Assert.fail("interrupted while waiting spans: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@ -249,15 +272,16 @@ public void configure(HTraceConfiguration conf) {
|
||||
}
|
||||
|
||||
public void receiveSpan(Span span) {
|
||||
SetHolder.spans.add(span);
|
||||
SetHolder.spans.put(span.getSpanId(), span);
|
||||
}
|
||||
|
||||
public void close() {
|
||||
}
|
||||
|
||||
public static class SetHolder {
|
||||
public static Set<Span> spans = new HashSet<Span>();
|
||||
|
||||
public static ConcurrentHashMap<Long, Span> spans =
|
||||
new ConcurrentHashMap<Long, Span>();
|
||||
|
||||
public static int size() {
|
||||
return spans.size();
|
||||
}
|
||||
@ -265,7 +289,7 @@ public static int size() {
|
||||
public static Map<String, List<Span>> getMap() {
|
||||
Map<String, List<Span>> map = new HashMap<String, List<Span>>();
|
||||
|
||||
for (Span s : spans) {
|
||||
for (Span s : spans.values()) {
|
||||
List<Span> l = map.get(s.getDescription());
|
||||
if (l == null) {
|
||||
l = new LinkedList<Span>();
|
||||
|
@ -0,0 +1,97 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tracing;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.unix.DomainSocket;
|
||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||
import org.htrace.Sampler;
|
||||
import org.htrace.Span;
|
||||
import org.htrace.Trace;
|
||||
import org.htrace.TraceScope;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import java.io.IOException;
|
||||
|
||||
public class TestTracingShortCircuitLocalRead {
|
||||
private static Configuration conf;
|
||||
private static MiniDFSCluster cluster;
|
||||
private static DistributedFileSystem dfs;
|
||||
private static SpanReceiverHost spanReceiverHost;
|
||||
private static TemporarySocketDirectory sockDir;
|
||||
static final Path TEST_PATH = new Path("testShortCircuitTraceHooks");
|
||||
static final int TEST_LENGTH = 1234;
|
||||
|
||||
@BeforeClass
|
||||
public static void init() {
|
||||
sockDir = new TemporarySocketDirectory();
|
||||
DomainSocket.disableBindPathValidation();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdown() throws IOException {
|
||||
sockDir.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShortCircuitTraceHooks() throws IOException {
|
||||
conf = new Configuration();
|
||||
conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY,
|
||||
TestTracing.SetSpanReceiver.class.getName());
|
||||
conf.setLong("dfs.blocksize", 100 * 1024);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
|
||||
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
||||
"testShortCircuitTraceHooks._PORT");
|
||||
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1)
|
||||
.build();
|
||||
dfs = cluster.getFileSystem();
|
||||
|
||||
try {
|
||||
spanReceiverHost = SpanReceiverHost.getInstance(conf);
|
||||
DFSTestUtil.createFile(dfs, TEST_PATH, TEST_LENGTH, (short)1, 5678L);
|
||||
|
||||
TraceScope ts = Trace.startSpan("testShortCircuitTraceHooks", Sampler.ALWAYS);
|
||||
FSDataInputStream stream = dfs.open(TEST_PATH);
|
||||
byte buf[] = new byte[TEST_LENGTH];
|
||||
IOUtils.readFully(stream, buf, 0, TEST_LENGTH);
|
||||
stream.close();
|
||||
ts.close();
|
||||
|
||||
String[] expectedSpanNames = {
|
||||
"OpRequestShortCircuitAccessProto",
|
||||
"ShortCircuitShmRequestProto"
|
||||
};
|
||||
TestTracing.assertSpanNamesFound(expectedSpanNames);
|
||||
} finally {
|
||||
dfs.close();
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user