HDFS-6879. Adding tracing to Hadoop RPC. Contributed by Masatake Iwasaki.

This commit is contained in:
Colin Patrick Mccabe 2014-08-27 14:12:05 -07:00
parent b6b95ff667
commit 6962510f72
15 changed files with 738 additions and 6 deletions

View File

@ -224,6 +224,10 @@
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.htrace</groupId>
<artifactId>htrace-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>

View File

@ -88,6 +88,7 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.htrace.Trace;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@ -694,6 +695,9 @@ private synchronized void setupIOstreams() {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to "+server);
}
if (Trace.isTracing()) {
Trace.addTimelineAnnotation("IPC client connecting to " + server);
}
short numRetries = 0;
Random rand = null;
while (true) {
@ -758,6 +762,10 @@ public AuthMethod run()
// update last activity time
touch();
if (Trace.isTracing()) {
Trace.addTimelineAnnotation("IPC client connected to " + server);
}
// start the receiver thread after the socket connection has been set
// up
start();

View File

@ -48,6 +48,9 @@
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.Time;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
@ -191,6 +194,16 @@ public Object invoke(Object proxy, Method method, Object[] args)
+ method.getName() + "]");
}
TraceScope traceScope = null;
// if Tracing is on then start a new span for this rpc.
// guard it in the if statement to make sure there isn't
// any extra string manipulation.
if (Trace.isTracing()) {
traceScope = Trace.startSpan(
method.getDeclaringClass().getCanonicalName() +
"." + method.getName());
}
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
if (LOG.isTraceEnabled()) {
@ -212,8 +225,13 @@ public Object invoke(Object proxy, Method method, Object[] args)
remoteId + ": " + method.getName() +
" {" + e + "}");
}
if (Trace.isTracing()) {
traceScope.getSpan().addTimelineAnnotation(
"Call got exception: " + e.getMessage());
}
throw new ServiceException(e);
} finally {
if (traceScope != null) traceScope.close();
}
if (LOG.isDebugEnabled()) {

View File

@ -79,6 +79,7 @@
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
@ -115,6 +116,10 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.htrace.Span;
import org.htrace.Trace;
import org.htrace.TraceInfo;
import org.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
@ -506,6 +511,7 @@ public static class Call implements Schedulable {
private ByteBuffer rpcResponse; // the response for this call
private final RPC.RpcKind rpcKind;
private final byte[] clientId;
private final Span traceSpan; // the tracing span on the server side
public Call(int id, int retryCount, Writable param,
Connection connection) {
@ -515,6 +521,11 @@ public Call(int id, int retryCount, Writable param,
public Call(int id, int retryCount, Writable param, Connection connection,
RPC.RpcKind kind, byte[] clientId) {
this(id, retryCount, param, connection, kind, clientId, null);
}
public Call(int id, int retryCount, Writable param, Connection connection,
RPC.RpcKind kind, byte[] clientId, Span span) {
this.callId = id;
this.retryCount = retryCount;
this.rpcRequest = param;
@ -523,6 +534,7 @@ public Call(int id, int retryCount, Writable param, Connection connection,
this.rpcResponse = null;
this.rpcKind = kind;
this.clientId = clientId;
this.traceSpan = span;
}
@Override
@ -1921,9 +1933,18 @@ private void processRpcRequest(RpcRequestHeaderProto header,
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
}
Span traceSpan = null;
if (header.hasTraceInfo()) {
// If the incoming RPC included tracing info, always continue the trace
TraceInfo parentSpan = new TraceInfo(header.getTraceInfo().getTraceId(),
header.getTraceInfo().getParentId());
traceSpan = Trace.startSpan(rpcRequest.toString(), parentSpan).detach();
}
Call call = new Call(header.getCallId(), header.getRetryCount(),
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header
.getClientId().toByteArray());
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
header.getClientId().toByteArray(), traceSpan);
callQueue.put(call); // queue the call; maybe blocked here
incRpcCount(); // Increment the rpc count
}
@ -2067,6 +2088,7 @@ public void run() {
ByteArrayOutputStream buf =
new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
while (running) {
TraceScope traceScope = null;
try {
final Call call = callQueue.take(); // pop the queue; maybe blocked here
if (LOG.isDebugEnabled()) {
@ -2083,6 +2105,10 @@ public void run() {
Writable value = null;
CurCall.set(call);
if (call.traceSpan != null) {
traceScope = Trace.continueSpan(call.traceSpan);
}
try {
// Make the call as the user via Subject.doAs, thus associating
// the call with the Subject
@ -2156,9 +2182,22 @@ public Writable run() throws Exception {
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
if (Trace.isTracing()) {
traceScope.getSpan().addTimelineAnnotation("unexpectedly interrupted: " +
StringUtils.stringifyException(e));
}
}
} catch (Exception e) {
LOG.info(Thread.currentThread().getName() + " caught an exception", e);
if (Trace.isTracing()) {
traceScope.getSpan().addTimelineAnnotation("Exception: " +
StringUtils.stringifyException(e));
}
} finally {
if (traceScope != null) {
traceScope.close();
}
IOUtils.cleanup(LOG, traceScope);
}
}
LOG.debug(Thread.currentThread().getName() + ": exiting");

View File

@ -41,6 +41,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.*;
import org.htrace.Trace;
import org.htrace.TraceScope;
/** An RpcEngine implementation for Writable data. */
@InterfaceStability.Evolving
@ -227,9 +229,19 @@ public Object invoke(Object proxy, Method method, Object[] args)
if (LOG.isDebugEnabled()) {
startTime = Time.now();
}
ObjectWritable value = (ObjectWritable)
client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId);
TraceScope traceScope = null;
if (Trace.isTracing()) {
traceScope = Trace.startSpan(
method.getDeclaringClass().getCanonicalName() +
"." + method.getName());
}
ObjectWritable value;
try {
value = (ObjectWritable)
client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId);
} finally {
if (traceScope != null) traceScope.close();
}
if (LOG.isDebugEnabled()) {
long callTime = Time.now() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);

View File

@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.htrace.HTraceConfiguration;
import org.htrace.SpanReceiver;
import org.htrace.Trace;
/**
* This class provides functions for reading the names of SpanReceivers from
* the Hadoop configuration, adding those SpanReceivers to the Tracer,
* and closing those SpanReceivers when appropriate.
* This class does nothing If no SpanReceiver is configured.
*/
@InterfaceAudience.Private
public class SpanReceiverHost {
public static final String SPAN_RECEIVERS_CONF_KEY = "hadoop.trace.spanreceiver.classes";
private static final Log LOG = LogFactory.getLog(SpanReceiverHost.class);
private Collection<SpanReceiver> receivers = new HashSet<SpanReceiver>();
private boolean closed = false;
private static enum SingletonHolder {
INSTANCE;
Object lock = new Object();
SpanReceiverHost host = null;
}
public static SpanReceiverHost getInstance(Configuration conf) {
if (SingletonHolder.INSTANCE.host != null) {
return SingletonHolder.INSTANCE.host;
}
synchronized (SingletonHolder.INSTANCE.lock) {
if (SingletonHolder.INSTANCE.host != null) {
return SingletonHolder.INSTANCE.host;
}
SpanReceiverHost host = new SpanReceiverHost();
host.loadSpanReceivers(conf);
SingletonHolder.INSTANCE.host = host;
ShutdownHookManager.get().addShutdownHook(new Runnable() {
public void run() {
SingletonHolder.INSTANCE.host.closeReceivers();
}
}, 0);
return SingletonHolder.INSTANCE.host;
}
}
/**
* Reads the names of classes specified in the
* "hadoop.trace.spanreceiver.classes" property and instantiates and registers
* them with the Tracer as SpanReceiver's.
*
* The nullary constructor is called during construction, but if the classes
* specified implement the Configurable interface, setConfiguration() will be
* called on them. This allows SpanReceivers to use values from the Hadoop
* configuration.
*/
public void loadSpanReceivers(Configuration conf) {
Class<?> implClass = null;
String[] receiverNames = conf.getTrimmedStrings(SPAN_RECEIVERS_CONF_KEY);
if (receiverNames == null || receiverNames.length == 0) {
return;
}
for (String className : receiverNames) {
className = className.trim();
try {
implClass = Class.forName(className);
receivers.add(loadInstance(implClass, conf));
LOG.info("SpanReceiver " + className + " was loaded successfully.");
} catch (ClassNotFoundException e) {
LOG.warn("Class " + className + " cannot be found.", e);
} catch (IOException e) {
LOG.warn("Load SpanReceiver " + className + " failed.", e);
}
}
for (SpanReceiver rcvr : receivers) {
Trace.addReceiver(rcvr);
}
}
private SpanReceiver loadInstance(Class<?> implClass, Configuration conf)
throws IOException {
SpanReceiver impl;
try {
Object o = ReflectionUtils.newInstance(implClass, conf);
impl = (SpanReceiver)o;
impl.configure(wrapHadoopConf(conf));
} catch (SecurityException e) {
throw new IOException(e);
} catch (IllegalArgumentException e) {
throw new IOException(e);
} catch (RuntimeException e) {
throw new IOException(e);
}
return impl;
}
private static HTraceConfiguration wrapHadoopConf(final Configuration conf) {
return new HTraceConfiguration() {
public static final String HTRACE_CONF_PREFIX = "hadoop.";
@Override
public String get(String key) {
return conf.get(HTRACE_CONF_PREFIX + key);
}
@Override
public String get(String key, String defaultValue) {
return conf.get(HTRACE_CONF_PREFIX + key, defaultValue);
}
};
}
/**
* Calls close() on all SpanReceivers created by this SpanReceiverHost.
*/
public synchronized void closeReceivers() {
if (closed) return;
closed = true;
for (SpanReceiver rcvr : receivers) {
try {
rcvr.close();
} catch (IOException e) {
LOG.warn("Unable to close SpanReceiver correctly: " + e.getMessage(), e);
}
}
}
}

View File

@ -27,6 +27,8 @@
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.*;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.UserGroupInformation;
import org.htrace.Span;
import org.htrace.Trace;
import com.google.protobuf.ByteString;
@ -165,6 +167,15 @@ public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder();
result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId)
.setRetryCount(retryCount).setClientId(ByteString.copyFrom(uuid));
// Add tracing info if we are currently tracing.
if (Trace.isTracing()) {
Span s = Trace.currentSpan();
result.setTraceInfo(RPCTraceInfoProto.newBuilder()
.setParentId(s.getSpanId())
.setTraceId(s.getTraceId()).build());
}
return result.build();
}
}

View File

@ -53,6 +53,18 @@ enum RpcKindProto {
/**
* Used to pass through the information necessary to continue
* a trace after an RPC is made. All we need is the traceid
* (so we know the overarching trace this message is a part of), and
* the id of the current span when this message was sent, so we know
* what span caused the new span we will create when this message is received.
*/
message RPCTraceInfoProto {
optional int64 traceId = 1;
optional int64 parentId = 2;
}
message RpcRequestHeaderProto { // the header for the RpcRequest
enum OperationProto {
RPC_FINAL_PACKET = 0; // The final RPC Packet
@ -67,6 +79,7 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
// clientId + callId uniquely identifies a request
// retry count, 1 means this is the first retry
optional sint32 retryCount = 5 [default = -1];
optional RPCTraceInfoProto traceInfo = 6; // tracing info
}

View File

@ -0,0 +1,169 @@
~~ Licensed under the Apache License, Version 2.0 (the "License");
~~ you may not use this file except in compliance with the License.
~~ You may obtain a copy of the License at
~~
~~ http://www.apache.org/licenses/LICENSE-2.0
~~
~~ Unless required by applicable law or agreed to in writing, software
~~ distributed under the License is distributed on an "AS IS" BASIS,
~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~~ See the License for the specific language governing permissions and
~~ limitations under the License. See accompanying LICENSE file.
---
Hadoop Distributed File System-${project.version} - Enabling Dapper-like Tracing
---
---
${maven.build.timestamp}
Enabling Dapper-like Tracing in HDFS
%{toc|section=1|fromDepth=0}
* {Dapper-like Tracing in HDFS}
** HTrace
{{{https://issues.apache.org/jira/browse/HDFS-5274}HDFS-5274}}
added support for tracing requests through HDFS,
using the open source tracing library, {{{https://github.com/cloudera/htrace}HTrace}}.
Setting up tracing is quite simple, however it requires some very minor changes to your client code.
** SpanReceivers
The tracing system works by collecting information in structs called 'Spans'.
It is up to you to choose how you want to receive this information
by implementing the SpanReceiver interface, which defines one method:
+----
public void receiveSpan(Span span);
+----
Configure what SpanReceivers you'd like to use
by putting a comma separated list of the fully-qualified class name of
classes implementing SpanReceiver
in <<<hdfs-site.xml>>> property: <<<hadoop.trace.spanreceiver.classes>>>.
+----
<property>
<name>hadoop.trace.spanreceiver.classes</name>
<value>org.htrace.impl.LocalFileSpanReceiver</value>
</property>
<property>
<name>hadoop.local-file-span-receiver.path</name>
<value>/var/log/hadoop/htrace.out</value>
</property>
+----
** Setting up ZipkinSpanReceiver
Instead of implementing SpanReceiver by yourself,
you can use <<<ZipkinSpanReceiver>>> which uses
{{{https://github.com/twitter/zipkin}Zipkin}}
for collecting and dispalying tracing data.
In order to use <<<ZipkinSpanReceiver>>>,
you need to download and setup {{{https://github.com/twitter/zipkin}Zipkin}} first.
you also need to add the jar of <<<htrace-zipkin>>> to the classpath of Hadoop on each node.
Here is example setup procedure.
+----
$ git clone https://github.com/cloudera/htrace
$ cd htrace/htrace-zipkin
$ mvn compile assembly:single
$ cp target/htrace-zipkin-*-jar-with-dependencies.jar $HADOOP_HOME/share/hadoop/hdfs/lib/
+----
The sample configuration for <<<ZipkinSpanReceiver>>> is shown below.
By adding these to <<<hdfs-site.xml>>> of NameNode and DataNodes,
<<<ZipkinSpanReceiver>>> is initialized on the startup.
You also need this configuration on the client node in addition to the servers.
+----
<property>
<name>hadoop.trace.spanreceiver.classes</name>
<value>org.htrace.impl.ZipkinSpanReceiver</value>
</property>
<property>
<name>hadoop.zipkin.collector-hostname</name>
<value>192.168.1.2</value>
</property>
<property>
<name>hadoop.zipkin.collector-port</name>
<value>9410</value>
</property>
+----
** Turning on tracing by HTrace API
In order to turn on Dapper-like tracing,
you will need to wrap the traced logic with <<tracing span>> as shown below.
When there is running tracing spans,
the tracing information is propagated to servers along with RPC requests.
In addition, you need to initialize <<<SpanReceiver>>> once per process.
+----
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.tracing.SpanReceiverHost;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;
...
SpanReceiverHost.getInstance(new HdfsConfiguration());
...
TraceScope ts = Trace.startSpan("Gets", Sampler.ALWAYS);
try {
... // traced logic
} finally {
if (ts != null) ts.close();
}
+----
** Sample code for tracing
The <<<TracingFsShell.java>>> shown below is the wrapper of FsShell
which start tracing span before invoking HDFS shell command.
+----
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.tracing.SpanReceiverHost;
import org.apache.hadoop.util.ToolRunner;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;
public class TracingFsShell {
public static void main(String argv[]) throws Exception {
Configuration conf = new Configuration();
FsShell shell = new FsShell();
conf.setQuietMode(false);
shell.setConf(conf);
int res = 0;
SpanReceiverHost.init(new HdfsConfiguration());
TraceScope ts = null;
try {
ts = Trace.startSpan("FsShell", Sampler.ALWAYS);
res = ToolRunner.run(shell, argv);
} finally {
shell.close();
if (ts != null) ts.close();
}
System.exit(res);
}
}
+----
You can compile and execute this code as shown below.
+----
$ javac -cp `hadoop classpath` TracingFsShell.java
$ HADOOP_CLASSPATH=. hdfs TracingFsShell -put sample.txt /tmp/
+----

View File

@ -181,6 +181,10 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>xercesImpl</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.htrace</groupId>
<artifactId>htrace-core</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -180,6 +180,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.tracing.SpanReceiverHost;
import org.mortbay.util.ajax.JSON;
import com.google.common.annotations.VisibleForTesting;
@ -326,6 +327,8 @@ public static InetSocketAddress createSocketAddr(String target) {
private boolean isPermissionEnabled;
private String dnUserName = null;
private SpanReceiverHost spanReceiverHost;
/**
* Create the DataNode given a configuration, an array of dataDirs,
* and a namenode proxy
@ -823,6 +826,7 @@ void startDataNode(Configuration conf,
this.dataDirs = dataDirs;
this.conf = conf;
this.dnConf = new DNConf(conf);
this.spanReceiverHost = SpanReceiverHost.getInstance(conf);
if (dnConf.maxLockedMemory > 0) {
if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {
@ -1510,6 +1514,9 @@ public void shutdown() {
MBeans.unregister(dataNodeInfoBeanName);
dataNodeInfoBeanName = null;
}
if (this.spanReceiverHost != null) {
this.spanReceiverHost.closeReceivers();
}
if (shortCircuitRegistry != null) shortCircuitRegistry.shutdown();
LOG.info("Shutdown complete.");
synchronized(this) {

View File

@ -60,6 +60,7 @@
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.tracing.SpanReceiverHost;
import org.apache.hadoop.util.ExitUtil.ExitException;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.JvmPauseMonitor;
@ -278,6 +279,7 @@ public long getProtocolVersion(String protocol,
private JvmPauseMonitor pauseMonitor;
private ObjectName nameNodeStatusBeanName;
private SpanReceiverHost spanReceiverHost;
/**
* The namenode address that clients will use to access this namenode
* or the name service. For HA configurations using logical URI, it
@ -586,6 +588,9 @@ protected void initialize(Configuration conf) throws IOException {
if (NamenodeRole.NAMENODE == role) {
startHttpServer(conf);
}
this.spanReceiverHost = SpanReceiverHost.getInstance(conf);
loadNamesystem(conf);
rpcServer = createRpcServer(conf);
@ -822,6 +827,9 @@ public void stop() {
MBeans.unregister(nameNodeStatusBeanName);
nameNodeStatusBeanName = null;
}
if (this.spanReceiverHost != null) {
this.spanReceiverHost.closeReceivers();
}
}
}

View File

@ -0,0 +1,280 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.htrace.HTraceConfiguration;
import org.htrace.Sampler;
import org.htrace.Span;
import org.htrace.SpanReceiver;
import org.htrace.Trace;
import org.htrace.TraceScope;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class TestTracing {
private static Configuration conf;
private static MiniDFSCluster cluster;
private static DistributedFileSystem dfs;
@Test
public void testSpanReceiverHost() throws Exception {
Configuration conf = new Configuration();
conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY,
SetSpanReceiver.class.getName());
SpanReceiverHost spanReceiverHost = SpanReceiverHost.getInstance(conf);
}
@Test
public void testWriteTraceHooks() throws Exception {
long startTime = System.currentTimeMillis();
TraceScope ts = Trace.startSpan("testWriteTraceHooks", Sampler.ALWAYS);
Path file = new Path("traceWriteTest.dat");
FSDataOutputStream stream = dfs.create(file);
for (int i = 0; i < 10; i++) {
byte[] data = RandomStringUtils.randomAlphabetic(102400).getBytes();
stream.write(data);
}
stream.hflush();
stream.close();
long endTime = System.currentTimeMillis();
ts.close();
String[] expectedSpanNames = {
"testWriteTraceHooks",
"org.apache.hadoop.hdfs.protocol.ClientProtocol.create",
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.create",
"org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync",
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.fsync",
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.complete"
};
assertSpanNamesFound(expectedSpanNames);
// The trace should last about the same amount of time as the test
Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap();
Span s = map.get("testWriteTraceHooks").get(0);
Assert.assertNotNull(s);
long spanStart = s.getStartTimeMillis();
long spanEnd = s.getStopTimeMillis();
Assert.assertTrue(spanStart - startTime < 100);
Assert.assertTrue(spanEnd - endTime < 100);
// There should only be one trace id as it should all be homed in the
// top trace.
for (Span span : SetSpanReceiver.SetHolder.spans) {
Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId());
}
}
@Test
public void testWriteWithoutTraceHooks() throws Exception {
Path file = new Path("withoutTraceWriteTest.dat");
FSDataOutputStream stream = dfs.create(file);
for (int i = 0; i < 10; i++) {
byte[] data = RandomStringUtils.randomAlphabetic(102400).getBytes();
stream.write(data);
}
stream.hflush();
stream.close();
Assert.assertTrue(SetSpanReceiver.SetHolder.size() == 0);
}
@Test
public void testReadTraceHooks() throws Exception {
String fileName = "traceReadTest.dat";
Path filePath = new Path(fileName);
// Create the file.
FSDataOutputStream ostream = dfs.create(filePath);
for (int i = 0; i < 50; i++) {
byte[] data = RandomStringUtils.randomAlphabetic(10240).getBytes();
ostream.write(data);
}
ostream.close();
long startTime = System.currentTimeMillis();
TraceScope ts = Trace.startSpan("testReadTraceHooks", Sampler.ALWAYS);
FSDataInputStream istream = dfs.open(filePath, 10240);
ByteBuffer buf = ByteBuffer.allocate(10240);
int count = 0;
try {
while (istream.read(buf) > 0) {
count += 1;
buf.clear();
istream.seek(istream.getPos() + 5);
}
} catch (IOException ioe) {
// Ignore this it's probably a seek after eof.
} finally {
istream.close();
}
ts.getSpan().addTimelineAnnotation("count: " + count);
long endTime = System.currentTimeMillis();
ts.close();
String[] expectedSpanNames = {
"testReadTraceHooks",
"org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations",
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.getBlockLocations"
};
assertSpanNamesFound(expectedSpanNames);
// The trace should last about the same amount of time as the test
Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap();
Span s = map.get("testReadTraceHooks").get(0);
Assert.assertNotNull(s);
long spanStart = s.getStartTimeMillis();
long spanEnd = s.getStopTimeMillis();
Assert.assertTrue(spanStart - startTime < 100);
Assert.assertTrue(spanEnd - endTime < 100);
// There should only be one trace id as it should all be homed in the
// top trace.
for (Span span : SetSpanReceiver.SetHolder.spans) {
Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId());
}
}
@Test
public void testReadWithoutTraceHooks() throws Exception {
String fileName = "withoutTraceReadTest.dat";
Path filePath = new Path(fileName);
// Create the file.
FSDataOutputStream ostream = dfs.create(filePath);
for (int i = 0; i < 50; i++) {
byte[] data = RandomStringUtils.randomAlphabetic(10240).getBytes();
ostream.write(data);
}
ostream.close();
FSDataInputStream istream = dfs.open(filePath, 10240);
ByteBuffer buf = ByteBuffer.allocate(10240);
int count = 0;
try {
while (istream.read(buf) > 0) {
count += 1;
buf.clear();
istream.seek(istream.getPos() + 5);
}
} catch (IOException ioe) {
// Ignore this it's probably a seek after eof.
} finally {
istream.close();
}
Assert.assertTrue(SetSpanReceiver.SetHolder.size() == 0);
}
@Before
public void cleanSet() {
SetSpanReceiver.SetHolder.spans.clear();
}
@BeforeClass
public static void setupCluster() throws IOException {
conf = new Configuration();
conf.setLong("dfs.blocksize", 100 * 1024);
conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY,
SetSpanReceiver.class.getName());
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.build();
dfs = cluster.getFileSystem();
}
@AfterClass
public static void shutDown() throws IOException {
cluster.shutdown();
}
private void assertSpanNamesFound(String[] expectedSpanNames) {
Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap();
for (String spanName : expectedSpanNames) {
Assert.assertTrue("Should find a span with name " + spanName, map.get(spanName) != null);
}
}
/**
* Span receiver that puts all spans into a single set.
* This is useful for testing.
* <p/>
* We're not using HTrace's POJOReceiver here so as that doesn't
* push all the metrics to a static place, and would make testing
* SpanReceiverHost harder.
*/
public static class SetSpanReceiver implements SpanReceiver {
public void configure(HTraceConfiguration conf) {
}
public void receiveSpan(Span span) {
SetHolder.spans.add(span);
}
public void close() {
}
public static class SetHolder {
public static Set<Span> spans = new HashSet<Span>();
public static int size() {
return spans.size();
}
public static Map<String, List<Span>> getMap() {
Map<String, List<Span>> map = new HashMap<String, List<Span>>();
for (Span s : spans) {
List<Span> l = map.get(s.getDescription());
if (l == null) {
l = new LinkedList<Span>();
map.put(s.getDescription(), l);
}
l.add(s);
}
return map;
}
}
}
}

View File

@ -677,6 +677,11 @@
<artifactId>jsch</artifactId>
<version>0.1.42</version>
</dependency>
<dependency>
<groupId>org.htrace</groupId>
<artifactId>htrace-core</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>org.jdom</groupId>
<artifactId>jdom</artifactId>

View File

@ -65,6 +65,7 @@
<item name="Service Level Authorization" href="hadoop-project-dist/hadoop-common/ServiceLevelAuth.html"/>
<item name="HTTP Authentication" href="hadoop-project-dist/hadoop-common/HttpAuthentication.html"/>
<item name="Hadoop KMS" href="hadoop-kms/index.html"/>
<item name="Tracing" href="hadoop-project-dist/hadoop-common/Tracing.html"/>
</menu>
<menu name="HDFS" inherit="top">