HDFS-6956. Allow dynamically changing the tracing level in Hadoop servers (cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2014-09-26 10:30:30 -07:00
parent a6049aa994
commit 55302ccfba
20 changed files with 999 additions and 32 deletions

View File

@ -296,6 +296,10 @@
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.proto\.GenericRefreshProtocolProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.tracing\.TraceAdminPB.*">
</Match>
<!--
Manually checked, misses child thread manually syncing on parent's intrinsic lock.

View File

@ -348,6 +348,7 @@
<include>ProtobufRpcEngine.proto</include>
<include>Security.proto</include>
<include>GetUserMappingsProtocol.proto</include>
<include>TraceAdmin.proto</include>
<include>RefreshAuthorizationPolicyProtocol.proto</include>
<include>RefreshUserMappingsProtocol.proto</include>
<include>RefreshCallQueueProtocol.proto</include>

View File

@ -35,6 +35,7 @@ function hadoop_usage()
echo " jar <jar> run a jar file"
echo " jnipath prints the java.library.path"
echo " key manage keys via the KeyProvider"
echo " trace view and modify Hadoop tracing settings"
echo " version print the version"
echo " or"
echo " CLASSNAME run the class named CLASSNAME"
@ -159,6 +160,9 @@ case ${COMMAND} in
key)
CLASS=org.apache.hadoop.crypto.key.KeyShell
;;
trace)
CLASS=org.apache.hadoop.tracing.TraceAdmin
;;
version)
CLASS=org.apache.hadoop.util.VersionInfo
;;

View File

@ -151,6 +151,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final String
HADOOP_SECURITY_SERVICE_AUTHORIZATION_GENERIC_REFRESH =
"security.refresh.generic.protocol.acl";
public static final String
HADOOP_SECURITY_SERVICE_AUTHORIZATION_TRACING =
"security.trace.protocol.acl";
public static final String
SECURITY_HA_SERVICE_PROTOCOL_ACL = "security.ha.service.protocol.acl";
public static final String

View File

@ -18,17 +18,23 @@
package org.apache.hadoop.tracing;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tracing.SpanReceiverInfo.ConfigurationPair;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.htrace.HTraceConfiguration;
import org.htrace.SpanReceiver;
import org.htrace.Trace;
/**
* This class provides functions for reading the names of SpanReceivers from
@ -37,11 +43,15 @@
* This class does nothing If no SpanReceiver is configured.
*/
@InterfaceAudience.Private
public class SpanReceiverHost {
public static final String SPAN_RECEIVERS_CONF_KEY = "hadoop.trace.spanreceiver.classes";
public class SpanReceiverHost implements TraceAdminProtocol {
public static final String SPAN_RECEIVERS_CONF_KEY =
"hadoop.trace.spanreceiver.classes";
private static final Log LOG = LogFactory.getLog(SpanReceiverHost.class);
private Collection<SpanReceiver> receivers = new HashSet<SpanReceiver>();
private final TreeMap<Long, SpanReceiver> receivers =
new TreeMap<Long, SpanReceiver>();
private Configuration config;
private boolean closed = false;
private long highestId = 1;
private static enum SingletonHolder {
INSTANCE;
@ -69,6 +79,8 @@ public void run() {
}
}
private static List<ConfigurationPair> EMPTY = Collections.emptyList();
/**
* Reads the names of classes specified in the
* "hadoop.trace.spanreceiver.classes" property and instantiates and registers
@ -79,58 +91,75 @@ public void run() {
* called on them. This allows SpanReceivers to use values from the Hadoop
* configuration.
*/
public void loadSpanReceivers(Configuration conf) {
Class<?> implClass = null;
String[] receiverNames = conf.getTrimmedStrings(SPAN_RECEIVERS_CONF_KEY);
public synchronized void loadSpanReceivers(Configuration conf) {
config = new Configuration(conf);
String[] receiverNames =
config.getTrimmedStrings(SPAN_RECEIVERS_CONF_KEY);
if (receiverNames == null || receiverNames.length == 0) {
return;
}
for (String className : receiverNames) {
className = className.trim();
try {
implClass = Class.forName(className);
receivers.add(loadInstance(implClass, conf));
SpanReceiver rcvr = loadInstance(className, EMPTY);
Trace.addReceiver(rcvr);
receivers.put(highestId++, rcvr);
LOG.info("SpanReceiver " + className + " was loaded successfully.");
} catch (ClassNotFoundException e) {
LOG.warn("Class " + className + " cannot be found.", e);
} catch (IOException e) {
LOG.warn("Load SpanReceiver " + className + " failed.", e);
LOG.error("Failed to load SpanReceiver", e);
}
}
for (SpanReceiver rcvr : receivers) {
Trace.addReceiver(rcvr);
}
}
private SpanReceiver loadInstance(Class<?> implClass, Configuration conf)
throws IOException {
private synchronized SpanReceiver loadInstance(String className,
List<ConfigurationPair> extraConfig) throws IOException {
Class<?> implClass = null;
SpanReceiver impl;
try {
Object o = ReflectionUtils.newInstance(implClass, conf);
implClass = Class.forName(className);
Object o = ReflectionUtils.newInstance(implClass, config);
impl = (SpanReceiver)o;
impl.configure(wrapHadoopConf(conf));
impl.configure(wrapHadoopConf(config, extraConfig));
} catch (ClassCastException e) {
throw new IOException("Class " + className +
" does not implement SpanReceiver.");
} catch (ClassNotFoundException e) {
throw new IOException("Class " + className + " cannot be found.");
} catch (SecurityException e) {
throw new IOException(e);
throw new IOException("Got SecurityException while loading " +
"SpanReceiver " + className);
} catch (IllegalArgumentException e) {
throw new IOException(e);
throw new IOException("Got IllegalArgumentException while loading " +
"SpanReceiver " + className, e);
} catch (RuntimeException e) {
throw new IOException(e);
throw new IOException("Got RuntimeException while loading " +
"SpanReceiver " + className, e);
}
return impl;
}
private static HTraceConfiguration wrapHadoopConf(final Configuration conf) {
private static HTraceConfiguration wrapHadoopConf(final Configuration conf,
List<ConfigurationPair> extraConfig) {
final HashMap<String, String> extraMap = new HashMap<String, String>();
for (ConfigurationPair pair : extraConfig) {
extraMap.put(pair.getKey(), pair.getValue());
}
return new HTraceConfiguration() {
public static final String HTRACE_CONF_PREFIX = "hadoop.";
@Override
public String get(String key) {
if (extraMap.containsKey(key)) {
return extraMap.get(key);
}
return conf.get(HTRACE_CONF_PREFIX + key);
}
@Override
public String get(String key, String defaultValue) {
if (extraMap.containsKey(key)) {
return extraMap.get(key);
}
return conf.get(HTRACE_CONF_PREFIX + key, defaultValue);
}
};
@ -142,12 +171,67 @@ public String get(String key, String defaultValue) {
public synchronized void closeReceivers() {
if (closed) return;
closed = true;
for (SpanReceiver rcvr : receivers) {
for (SpanReceiver rcvr : receivers.values()) {
try {
rcvr.close();
} catch (IOException e) {
LOG.warn("Unable to close SpanReceiver correctly: " + e.getMessage(), e);
}
}
receivers.clear();
}
public synchronized SpanReceiverInfo[] listSpanReceivers()
throws IOException {
SpanReceiverInfo info[] = new SpanReceiverInfo[receivers.size()];
int i = 0;
for(Map.Entry<Long, SpanReceiver> entry : receivers.entrySet()) {
info[i] = new SpanReceiverInfo(entry.getKey(),
entry.getValue().getClass().getName());
i++;
}
return info;
}
public synchronized long addSpanReceiver(SpanReceiverInfo info)
throws IOException {
StringBuilder configStringBuilder = new StringBuilder();
String prefix = "";
for (ConfigurationPair pair : info.configPairs) {
configStringBuilder.append(prefix).append(pair.getKey()).
append(" = ").append(pair.getValue());
prefix = ", ";
}
SpanReceiver rcvr = null;
try {
rcvr = loadInstance(info.getClassName(), info.configPairs);
} catch (IOException e) {
LOG.info("Failed to add SpanReceiver " + info.getClassName() +
" with configuration " + configStringBuilder.toString(), e);
throw e;
} catch (RuntimeException e) {
LOG.info("Failed to add SpanReceiver " + info.getClassName() +
" with configuration " + configStringBuilder.toString(), e);
throw e;
}
Trace.addReceiver(rcvr);
long newId = highestId++;
receivers.put(newId, rcvr);
LOG.info("Successfully added SpanReceiver " + info.getClassName() +
" with configuration " + configStringBuilder.toString());
return newId;
}
public synchronized void removeSpanReceiver(long spanReceiverId)
throws IOException {
SpanReceiver rcvr = receivers.remove(spanReceiverId);
if (rcvr == null) {
throw new IOException("There is no span receiver with id " + spanReceiverId);
}
Trace.removeReceiver(rcvr);
rcvr.close();
LOG.info("Successfully removed SpanReceiver " + spanReceiverId +
" with class " + rcvr.getClass().getName());
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SpanReceiverInfo {
private final long id;
private final String className;
final List<ConfigurationPair> configPairs =
new LinkedList<ConfigurationPair>();
static class ConfigurationPair {
private final String key;
private final String value;
ConfigurationPair(String key, String value) {
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public String getValue() {
return value;
}
}
SpanReceiverInfo(long id, String className) {
this.id = id;
this.className = className;
}
public long getId() {
return id;
}
public String getClassName() {
return className;
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.tracing.SpanReceiverInfo.ConfigurationPair;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SpanReceiverInfoBuilder {
private SpanReceiverInfo info;
public SpanReceiverInfoBuilder(String className) {
info = new SpanReceiverInfo(0, className);
}
public void addConfigurationPair(String key, String value) {
info.configPairs.add(new ConfigurationPair(key, value));
}
public SpanReceiverInfo build() {
SpanReceiverInfo ret = info;
info = null;
return ret;
}
}

View File

@ -0,0 +1,197 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.TableListing;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
/**
* A command-line tool for viewing and modifying tracing settings.
*/
@InterfaceAudience.Private
public class TraceAdmin extends Configured implements Tool {
private TraceAdminProtocolPB proxy;
private TraceAdminProtocolTranslatorPB remote;
private void usage() {
PrintStream err = System.err;
err.print(
"Hadoop tracing configuration commands:\n" +
" -add [-class classname] [-Ckey=value] [-Ckey2=value2] ...\n" +
" Add a span receiver with the provided class name. Configuration\n" +
" keys for the span receiver can be specified with the -C options.\n" +
" The span receiver will also inherit whatever configuration keys\n" +
" exist in the daemon's configuration.\n" +
" -help: Print this help message.\n" +
" -host [hostname:port]\n" +
" Specify the hostname and port of the daemon to examine.\n" +
" Required for all commands.\n" +
" -list: List the current span receivers.\n" +
" -remove [id]\n" +
" Remove the span receiver with the specified id. Use -list to\n" +
" find the id of each receiver.\n"
);
}
private int listSpanReceivers(List<String> args) throws IOException {
SpanReceiverInfo infos[] = remote.listSpanReceivers();
if (infos.length == 0) {
System.out.println("[no span receivers found]");
return 0;
}
TableListing listing = new TableListing.Builder().
addField("ID").
addField("CLASS").
showHeaders().
build();
for (SpanReceiverInfo info : infos) {
listing.addRow("" + info.getId(), info.getClassName());
}
System.out.println(listing.toString());
return 0;
}
private final static String CONFIG_PREFIX = "-C";
private int addSpanReceiver(List<String> args) throws IOException {
String className = StringUtils.popOptionWithArgument("-class", args);
if (className == null) {
System.err.println("You must specify the classname with -class.");
return 1;
}
ByteArrayOutputStream configStream = new ByteArrayOutputStream();
PrintStream configsOut = new PrintStream(configStream);
SpanReceiverInfoBuilder factory = new SpanReceiverInfoBuilder(className);
String prefix = "";
for (int i = 0; i < args.size(); ++i) {
String str = args.get(i);
if (!str.startsWith(CONFIG_PREFIX)) {
System.err.println("Can't understand argument: " + str);
return 1;
}
str = str.substring(CONFIG_PREFIX.length());
int equalsIndex = str.indexOf("=");
if (equalsIndex < 0) {
System.err.println("Can't parse configuration argument " + str);
System.err.println("Arguments must be in the form key=value");
return 1;
}
String key = str.substring(0, equalsIndex);
String value = str.substring(equalsIndex + 1);
factory.addConfigurationPair(key, value);
configsOut.print(prefix + key + " = " + value);
prefix = ", ";
}
try {
long id = remote.addSpanReceiver(factory.build());
System.out.println("Added trace span receiver " + id +
" with configuration " + configStream.toString());
} catch (IOException e) {
System.out.println("addSpanReceiver error with configuration " +
configStream.toString());
throw e;
}
return 0;
}
private int removeSpanReceiver(List<String> args) throws IOException {
String indexStr = StringUtils.popFirstNonOption(args);
long id = -1;
try {
id = Long.parseLong(indexStr);
} catch (NumberFormatException e) {
System.err.println("Failed to parse ID string " +
indexStr + ": " + e.getMessage());
return 1;
}
remote.removeSpanReceiver(id);
System.err.println("Removed trace span receiver " + id);
return 0;
}
@Override
public int run(String argv[]) throws Exception {
LinkedList<String> args = new LinkedList<String>();
for (String arg : argv) {
args.add(arg);
}
if (StringUtils.popOption("-h", args) ||
StringUtils.popOption("-help", args)) {
usage();
return 0;
} else if (args.size() == 0) {
usage();
return 0;
}
String hostPort = StringUtils.popOptionWithArgument("-host", args);
if (hostPort == null) {
System.err.println("You must specify a host with -host.");
return 1;
}
if (args.size() < 0) {
System.err.println("You must specify an operation.");
return 1;
}
RPC.setProtocolEngine(getConf(), TraceAdminProtocolPB.class,
ProtobufRpcEngine.class);
InetSocketAddress address = NetUtils.createSocketAddr(hostPort);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Class<?> xface = TraceAdminProtocolPB.class;
proxy = (TraceAdminProtocolPB)RPC.getProxy(xface,
RPC.getProtocolVersion(xface), address,
ugi, getConf(), NetUtils.getDefaultSocketFactory(getConf()), 0);
remote = new TraceAdminProtocolTranslatorPB(proxy);
try {
if (args.get(0).equals("-list")) {
return listSpanReceivers(args.subList(1, args.size()));
} else if (args.get(0).equals("-add")) {
return addSpanReceiver(args.subList(1, args.size()));
} else if (args.get(0).equals("-remove")) {
return removeSpanReceiver(args.subList(1, args.size()));
} else {
System.err.println("Unrecognized tracing command: " + args.get(0));
System.err.println("Use -help for help.");
return 1;
}
} finally {
remote.close();
}
}
public static void main(String[] argv) throws Exception {
TraceAdmin admin = new TraceAdmin();
admin.setConf(new Configuration());
System.exit(admin.run(argv));
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.retry.AtMostOnce;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.security.KerberosInfo;
/**
* Protocol interface that provides tracing.
*/
@KerberosInfo(
serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface TraceAdminProtocol {
public static final long versionID = 1L;
/**
* List the currently active trace span receivers.
*
* @throws IOException On error.
*/
@Idempotent
public SpanReceiverInfo[] listSpanReceivers() throws IOException;
/**
* Add a new trace span receiver.
*
* @param desc The span receiver description.
* @return The ID of the new trace span receiver.
*
* @throws IOException On error.
*/
@AtMostOnce
public long addSpanReceiver(SpanReceiverInfo desc) throws IOException;
/**
* Remove a trace span receiver.
*
* @param spanReceiverId The id of the span receiver to remove.
* @throws IOException On error.
*/
@AtMostOnce
public void removeSpanReceiver(long spanReceiverId) throws IOException;
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.security.KerberosInfo;
@KerberosInfo(
serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
@ProtocolInfo(
protocolName = "org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService",
protocolVersion = 1)
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface TraceAdminProtocolPB extends
TraceAdminService.BlockingInterface, VersionedProtocol {
}

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.retry.AtMostOnce;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.tracing.TraceAdminPB.AddSpanReceiverRequestProto;
import org.apache.hadoop.tracing.TraceAdminPB.AddSpanReceiverResponseProto;
import org.apache.hadoop.tracing.TraceAdminPB.ListSpanReceiversRequestProto;
import org.apache.hadoop.tracing.TraceAdminPB.ListSpanReceiversResponseProto;
import org.apache.hadoop.tracing.TraceAdminPB.ConfigPair;
import org.apache.hadoop.tracing.TraceAdminPB.RemoveSpanReceiverRequestProto;
import org.apache.hadoop.tracing.TraceAdminPB.RemoveSpanReceiverResponseProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@InterfaceAudience.Private
public class TraceAdminProtocolServerSideTranslatorPB
implements TraceAdminProtocolPB, Closeable {
private final TraceAdminProtocol server;
public TraceAdminProtocolServerSideTranslatorPB(TraceAdminProtocol server) {
this.server = server;
}
@Override
public void close() throws IOException {
RPC.stopProxy(server);
}
@Override
public ListSpanReceiversResponseProto listSpanReceivers(
RpcController controller, ListSpanReceiversRequestProto req)
throws ServiceException {
try {
SpanReceiverInfo[] descs = server.listSpanReceivers();
ListSpanReceiversResponseProto.Builder bld =
ListSpanReceiversResponseProto.newBuilder();
for (int i = 0; i < descs.length; ++i) {
bld.addDescriptions(TraceAdminPB.SpanReceiverListInfo.newBuilder().
setId(descs[i].getId()).
setClassName(descs[i].getClassName()).build());
}
return bld.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public AddSpanReceiverResponseProto addSpanReceiver(
RpcController controller, AddSpanReceiverRequestProto req)
throws ServiceException {
try {
SpanReceiverInfoBuilder factory =
new SpanReceiverInfoBuilder(req.getClassName());
for (ConfigPair config : req.getConfigList()) {
factory.addConfigurationPair(config.getKey(), config.getValue());
}
long id = server.addSpanReceiver(factory.build());
return AddSpanReceiverResponseProto.newBuilder().setId(id).build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public RemoveSpanReceiverResponseProto removeSpanReceiver(
RpcController controller, RemoveSpanReceiverRequestProto req)
throws ServiceException {
try {
server.removeSpanReceiver(req.getId());
return RemoveSpanReceiverResponseProto.getDefaultInstance();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return TraceAdminProtocol.versionID;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
if (!protocol.equals(RPC.getProtocolName(TraceAdminProtocolPB.class))) {
throw new IOException("Serverside implements " +
RPC.getProtocolName(TraceAdminProtocolPB.class) +
". The following requested protocol is unknown: " + protocol);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
RPC.getProtocolVersion(TraceAdminProtocolPB.class),
TraceAdminProtocolPB.class);
}
}

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.tracing.TraceAdminPB.AddSpanReceiverRequestProto;
import org.apache.hadoop.tracing.TraceAdminPB.AddSpanReceiverResponseProto;
import org.apache.hadoop.tracing.TraceAdminPB.ListSpanReceiversRequestProto;
import org.apache.hadoop.tracing.TraceAdminPB.ListSpanReceiversResponseProto;
import org.apache.hadoop.tracing.TraceAdminPB.ConfigPair;
import org.apache.hadoop.tracing.TraceAdminPB.RemoveSpanReceiverRequestProto;
import org.apache.hadoop.tracing.TraceAdminPB.SpanReceiverListInfo;
import org.apache.hadoop.tracing.SpanReceiverInfo.ConfigurationPair;
import com.google.protobuf.ServiceException;
@InterfaceAudience.Private
public class TraceAdminProtocolTranslatorPB implements
TraceAdminProtocol, ProtocolTranslator, Closeable {
private final TraceAdminProtocolPB rpcProxy;
public TraceAdminProtocolTranslatorPB(TraceAdminProtocolPB rpcProxy) {
this.rpcProxy = rpcProxy;
}
@Override
public void close() throws IOException {
RPC.stopProxy(rpcProxy);
}
@Override
public SpanReceiverInfo[] listSpanReceivers() throws IOException {
ArrayList<SpanReceiverInfo> infos = new ArrayList<SpanReceiverInfo>(1);
try {
ListSpanReceiversRequestProto req =
ListSpanReceiversRequestProto.newBuilder().build();
ListSpanReceiversResponseProto resp =
rpcProxy.listSpanReceivers(null, req);
for (SpanReceiverListInfo info : resp.getDescriptionsList()) {
infos.add(new SpanReceiverInfo(info.getId(), info.getClassName()));
}
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
return infos.toArray(new SpanReceiverInfo[infos.size()]);
}
@Override
public long addSpanReceiver(SpanReceiverInfo info) throws IOException {
try {
AddSpanReceiverRequestProto.Builder bld =
AddSpanReceiverRequestProto.newBuilder();
bld.setClassName(info.getClassName());
for (ConfigurationPair configPair : info.configPairs) {
ConfigPair tuple = ConfigPair.newBuilder().
setKey(configPair.getKey()).
setValue(configPair.getValue()).build();
bld.addConfig(tuple);
}
AddSpanReceiverResponseProto resp =
rpcProxy.addSpanReceiver(null, bld.build());
return resp.getId();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void removeSpanReceiver(long spanReceiverId) throws IOException {
try {
RemoveSpanReceiverRequestProto req =
RemoveSpanReceiverRequestProto.newBuilder()
.setId(spanReceiverId).build();
rpcProxy.removeSpanReceiver(null, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* These .proto interfaces are private and stable.
* Please see http://wiki.apache.org/hadoop/Compatibility
* for what changes are allowed for a *stable* .proto interface.
*/
option java_package = "org.apache.hadoop.tracing";
option java_outer_classname = "TraceAdminPB";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.common;
message ListSpanReceiversRequestProto {
}
message SpanReceiverListInfo {
required int64 id = 1;
required string className = 2;
}
message ListSpanReceiversResponseProto {
repeated SpanReceiverListInfo descriptions = 1;
}
message ConfigPair {
required string key = 1;
required string value = 2;
}
message AddSpanReceiverRequestProto {
required string className = 1;
repeated ConfigPair config = 2;
}
message AddSpanReceiverResponseProto {
required int64 id = 1;
}
message RemoveSpanReceiverRequestProto {
required int64 id = 1;
}
message RemoveSpanReceiverResponseProto {
}
service TraceAdminService {
rpc listSpanReceivers(ListSpanReceiversRequestProto)
returns(ListSpanReceiversResponseProto);
rpc addSpanReceiver(AddSpanReceiverRequestProto)
returns(AddSpanReceiverResponseProto);
rpc removeSpanReceiver(RemoveSpanReceiverRequestProto)
returns(RemoveSpanReceiverResponseProto);
}

View File

@ -509,6 +509,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7077. Separate CipherSuite from crypto protocol version. (wang)
HDFS-6956. Allow dynamically changing the tracing level in Hadoop servers
(cmccabe)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.GenericRefreshProtocol;
import org.apache.hadoop.tracing.TraceAdminProtocol;
/**
* {@link PolicyProvider} for HDFS protocols.
@ -72,7 +73,10 @@ public class HDFSPolicyProvider extends PolicyProvider {
RefreshCallQueueProtocol.class),
new Service(
CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_GENERIC_REFRESH,
GenericRefreshProtocol.class)
GenericRefreshProtocol.class),
new Service(
CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_TRACING,
TraceAdminProtocol.class)
};
@Override

View File

@ -179,6 +179,10 @@
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.tracing.TraceAdminPB;
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
import org.apache.hadoop.tracing.TraceAdminProtocolPB;
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@ -189,6 +193,8 @@
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.tracing.SpanReceiverHost;
import org.apache.hadoop.tracing.SpanReceiverInfo;
import org.apache.hadoop.tracing.TraceAdminProtocol;
import org.mortbay.util.ajax.JSON;
import com.google.common.annotations.VisibleForTesting;
@ -230,7 +236,7 @@
@InterfaceAudience.Private
public class DataNode extends ReconfigurableBase
implements InterDatanodeProtocol, ClientDatanodeProtocol,
DataNodeMXBean {
TraceAdminProtocol, DataNodeMXBean {
public static final Log LOG = LogFactory.getLog(DataNode.class);
static{
@ -699,6 +705,14 @@ private void initIpcServer(Configuration conf) throws IOException {
.newReflectiveBlockingService(interDatanodeProtocolXlator);
DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service,
ipcServer);
TraceAdminProtocolServerSideTranslatorPB traceAdminXlator =
new TraceAdminProtocolServerSideTranslatorPB(this);
BlockingService traceAdminService = TraceAdminService
.newReflectiveBlockingService(traceAdminXlator);
DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class, traceAdminService,
ipcServer);
LOG.info("Opened IPC server at " + ipcServer.getListenerAddress());
// set service-level authorization security policy
@ -3025,4 +3039,22 @@ public long getLastDiskErrorCheck() {
return lastDiskErrorCheck;
}
}
@Override
public SpanReceiverInfo[] listSpanReceivers() throws IOException {
checkSuperuserPrivilege();
return spanReceiverHost.listSpanReceivers();
}
@Override
public long addSpanReceiver(SpanReceiverInfo info) throws IOException {
checkSuperuserPrivilege();
return spanReceiverHost.addSpanReceiver(info);
}
@Override
public void removeSpanReceiver(long id) throws IOException {
checkSuperuserPrivilege();
spanReceiverHost.removeSpanReceiver(id);
}
}

View File

@ -61,6 +61,7 @@
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.tracing.SpanReceiverHost;
import org.apache.hadoop.tracing.TraceAdminProtocol;
import org.apache.hadoop.util.ExitUtil.ExitException;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.JvmPauseMonitor;
@ -244,6 +245,8 @@ public long getProtocolVersion(String protocol,
return RefreshCallQueueProtocol.versionID;
} else if (protocol.equals(GetUserMappingsProtocol.class.getName())){
return GetUserMappingsProtocol.versionID;
} else if (protocol.equals(TraceAdminProtocol.class.getName())){
return TraceAdminProtocol.versionID;
} else {
throw new IOException("Unknown protocol to name node: " + protocol);
}
@ -279,7 +282,7 @@ public long getProtocolVersion(String protocol,
private JvmPauseMonitor pauseMonitor;
private ObjectName nameNodeStatusBeanName;
private SpanReceiverHost spanReceiverHost;
SpanReceiverHost spanReceiverHost;
/**
* The namenode address that clients will use to access this namenode
* or the name service. For HA configurations using logical URI, it

View File

@ -165,6 +165,11 @@
import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.tracing.SpanReceiverInfo;
import org.apache.hadoop.tracing.TraceAdminPB;
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
import org.apache.hadoop.tracing.TraceAdminProtocolPB;
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
@ -256,6 +261,11 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
new HAServiceProtocolServerSideTranslatorPB(this);
BlockingService haPbService = HAServiceProtocolService
.newReflectiveBlockingService(haServiceProtocolXlator);
TraceAdminProtocolServerSideTranslatorPB traceAdminXlator =
new TraceAdminProtocolServerSideTranslatorPB(this);
BlockingService traceAdminService = TraceAdminService
.newReflectiveBlockingService(traceAdminXlator);
WritableRpcEngine.ensureInitialized();
@ -300,7 +310,9 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
genericRefreshService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
getUserMappingService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class,
traceAdminService, serviceRpcServer);
// Update the address with the correct port
InetSocketAddress listenAddr = serviceRpcServer.getListenerAddress();
serviceRPCAddress = new InetSocketAddress(
@ -346,6 +358,8 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
genericRefreshService, clientRpcServer);
DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
getUserMappingService, clientRpcServer);
DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class,
traceAdminService, clientRpcServer);
// set service-level authorization security policy
if (serviceAuthEnabled =
@ -1597,5 +1611,22 @@ public EventsList getEditsFromTxid(long txid) throws IOException {
return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
}
}
@Override
public SpanReceiverInfo[] listSpanReceivers() throws IOException {
namesystem.checkSuperuserPrivilege();
return nn.spanReceiverHost.listSpanReceivers();
}
@Override
public long addSpanReceiver(SpanReceiverInfo info) throws IOException {
namesystem.checkSuperuserPrivilege();
return nn.spanReceiverHost.addSpanReceiver(info);
}
@Override
public void removeSpanReceiver(long id) throws IOException {
namesystem.checkSuperuserPrivilege();
nn.spanReceiverHost.removeSpanReceiver(id);
}
}

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.GenericRefreshProtocol;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.tracing.TraceAdminProtocol;
/** The full set of RPC methods implemented by the Namenode. */
@InterfaceAudience.Private
@ -38,5 +39,6 @@ public interface NamenodeProtocols
RefreshCallQueueProtocol,
GenericRefreshProtocol,
GetUserMappingsProtocol,
HAServiceProtocol {
HAServiceProtocol,
TraceAdminProtocol {
}

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
public class TestTraceAdmin {
private String runTraceCommand(TraceAdmin trace, String... cmd)
throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
PrintStream oldStdout = System.out;
PrintStream oldStderr = System.err;
System.setOut(ps);
System.setErr(ps);
int ret = -1;
try {
ret = trace.run(cmd);
} finally {
System.out.flush();
System.setOut(oldStdout);
System.setErr(oldStderr);
}
return "ret:" + ret + ", " + baos.toString();
}
private String getHostPortForNN(MiniDFSCluster cluster) {
return "127.0.0.1:" + cluster.getNameNodePort();
}
@Test
public void testCreateAndDestroySpanReceiver() throws Exception {
Configuration conf = new Configuration();
conf = new Configuration();
conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY, "");
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
TemporarySocketDirectory tempDir = new TemporarySocketDirectory();
String tracePath =
new File(tempDir.getDir(), "tracefile").getAbsolutePath();
try {
TraceAdmin trace = new TraceAdmin();
trace.setConf(conf);
Assert.assertEquals("ret:0, [no span receivers found]\n",
runTraceCommand(trace, "-list", "-host", getHostPortForNN(cluster)));
Assert.assertEquals("ret:0, Added trace span receiver 1 with " +
"configuration local-file-span-receiver.path = " + tracePath + "\n",
runTraceCommand(trace, "-add", "-host", getHostPortForNN(cluster),
"-class", "org.htrace.impl.LocalFileSpanReceiver",
"-Clocal-file-span-receiver.path=" + tracePath));
String list =
runTraceCommand(trace, "-list", "-host", getHostPortForNN(cluster));
Assert.assertTrue(list.startsWith("ret:0"));
Assert.assertTrue(list.contains("1 org.htrace.impl.LocalFileSpanReceiver"));
Assert.assertEquals("ret:0, Removed trace span receiver 1\n",
runTraceCommand(trace, "-remove", "1", "-host",
getHostPortForNN(cluster)));
Assert.assertEquals("ret:0, [no span receivers found]\n",
runTraceCommand(trace, "-list", "-host", getHostPortForNN(cluster)));
} finally {
cluster.shutdown();
tempDir.close();
}
}
}