HADOOP-15340. Provide meaningful RPC server name for RpcMetrics. Contributed by Elek Marton.
This commit is contained in:
parent
e76c2aeb28
commit
8ab776d61e
@ -419,8 +419,9 @@ public Server(Class<?> protocolClass, Object protocolImpl,
|
|||||||
String portRangeConfig)
|
String portRangeConfig)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
super(bindAddress, port, null, numHandlers,
|
super(bindAddress, port, null, numHandlers,
|
||||||
numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
|
numReaders, queueSizePerHandler, conf,
|
||||||
.getClass().getName()), secretManager, portRangeConfig);
|
serverNameFromClass(protocolImpl.getClass()), secretManager,
|
||||||
|
portRangeConfig);
|
||||||
this.verbose = verbose;
|
this.verbose = verbose;
|
||||||
registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
|
registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
|
||||||
protocolImpl);
|
protocolImpl);
|
||||||
|
@ -35,6 +35,8 @@
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
|
|
||||||
@ -808,13 +810,45 @@ public Server build() throws IOException, HadoopIllegalArgumentException {
|
|||||||
|
|
||||||
/** An RPC Server. */
|
/** An RPC Server. */
|
||||||
public abstract static class Server extends org.apache.hadoop.ipc.Server {
|
public abstract static class Server extends org.apache.hadoop.ipc.Server {
|
||||||
boolean verbose;
|
|
||||||
static String classNameBase(String className) {
|
boolean verbose;
|
||||||
String[] names = className.split("\\.", -1);
|
|
||||||
if (names == null || names.length == 0) {
|
private static final Pattern COMPLEX_SERVER_NAME_PATTERN =
|
||||||
return className;
|
Pattern.compile("(?:[^\\$]*\\$)*([A-Za-z][^\\$]+)(?:\\$\\d+)?");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a meaningful and short name for a server based on a java class.
|
||||||
|
*
|
||||||
|
* The rules are defined to support the current naming schema of the
|
||||||
|
* generated protobuf classes where the final class usually an anonymous
|
||||||
|
* inner class of an inner class.
|
||||||
|
*
|
||||||
|
* 1. For simple classes it returns with the simple name of the classes
|
||||||
|
* (with the name without package name)
|
||||||
|
*
|
||||||
|
* 2. For inner classes, this is the simple name of the inner class.
|
||||||
|
*
|
||||||
|
* 3. If it is an Object created from a class factory
|
||||||
|
* E.g., org.apache.hadoop.ipc.TestRPC$TestClass$2
|
||||||
|
* this method returns parent class TestClass.
|
||||||
|
*
|
||||||
|
* 4. If it is an anonymous class E.g., 'org.apache.hadoop.ipc.TestRPC$10'
|
||||||
|
* serverNameFromClass returns parent class TestRPC.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
static String serverNameFromClass(Class<?> clazz) {
|
||||||
|
String name = clazz.getName();
|
||||||
|
String[] names = clazz.getName().split("\\.", -1);
|
||||||
|
if (names != null && names.length > 0) {
|
||||||
|
name = names[names.length - 1];
|
||||||
|
}
|
||||||
|
Matcher matcher = COMPLEX_SERVER_NAME_PATTERN.matcher(name);
|
||||||
|
if (matcher.find()) {
|
||||||
|
return matcher.group(1);
|
||||||
|
} else {
|
||||||
|
return name;
|
||||||
}
|
}
|
||||||
return names[names.length-1];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -140,6 +140,10 @@ public abstract class Server {
|
|||||||
private RpcSaslProto negotiateResponse;
|
private RpcSaslProto negotiateResponse;
|
||||||
private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
|
private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
|
||||||
private Tracer tracer;
|
private Tracer tracer;
|
||||||
|
/**
|
||||||
|
* Logical name of the server used in metrics and monitor.
|
||||||
|
*/
|
||||||
|
private final String serverName;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add exception classes for which server won't log stack traces.
|
* Add exception classes for which server won't log stack traces.
|
||||||
@ -2768,6 +2772,7 @@ protected Server(String bindAddress, int port,
|
|||||||
this.rpcRequestClass = rpcRequestClass;
|
this.rpcRequestClass = rpcRequestClass;
|
||||||
this.handlerCount = handlerCount;
|
this.handlerCount = handlerCount;
|
||||||
this.socketSendBufferSize = 0;
|
this.socketSendBufferSize = 0;
|
||||||
|
this.serverName = serverName;
|
||||||
this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
|
this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
|
||||||
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
|
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
|
||||||
if (queueSizePerHandler != -1) {
|
if (queueSizePerHandler != -1) {
|
||||||
@ -3509,4 +3514,8 @@ public void run() {
|
|||||||
idleScanTimer.schedule(idleScanTask, idleScanInterval);
|
idleScanTimer.schedule(idleScanTask, idleScanInterval);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getServerName() {
|
||||||
|
return serverName;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -406,7 +406,7 @@ public Server(Class<?> protocolClass, Object protocolImpl,
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
super(bindAddress, port, null, numHandlers, numReaders,
|
super(bindAddress, port, null, numHandlers, numReaders,
|
||||||
queueSizePerHandler, conf,
|
queueSizePerHandler, conf,
|
||||||
classNameBase(protocolImpl.getClass().getName()), secretManager,
|
serverNameFromClass(protocolImpl.getClass()), secretManager,
|
||||||
portRangeConfig);
|
portRangeConfig);
|
||||||
|
|
||||||
this.verbose = verbose;
|
this.verbose = verbose;
|
||||||
|
@ -17,10 +17,12 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ipc.metrics;
|
package org.apache.hadoop.ipc.metrics;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsTag;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
@ -50,7 +52,9 @@ public class RpcMetrics {
|
|||||||
String port = String.valueOf(server.getListenerAddress().getPort());
|
String port = String.valueOf(server.getListenerAddress().getPort());
|
||||||
name = "RpcActivityForPort" + port;
|
name = "RpcActivityForPort" + port;
|
||||||
this.server = server;
|
this.server = server;
|
||||||
registry = new MetricsRegistry("rpc").tag("port", "RPC port", port);
|
registry = new MetricsRegistry("rpc")
|
||||||
|
.tag("port", "RPC port", port)
|
||||||
|
.tag("serverName", "Name of the RPC server", server.getServerName());
|
||||||
int[] intervals = conf.getInts(
|
int[] intervals = conf.getInts(
|
||||||
CommonConfigurationKeys.RPC_METRICS_PERCENTILES_INTERVALS_KEY);
|
CommonConfigurationKeys.RPC_METRICS_PERCENTILES_INTERVALS_KEY);
|
||||||
rpcQuantileEnable = (intervals.length > 0) && conf.getBoolean(
|
rpcQuantileEnable = (intervals.length > 0) && conf.getBoolean(
|
||||||
@ -292,4 +296,9 @@ public double getDeferredRpcProcessingMean() {
|
|||||||
public double getDeferredRpcProcessingStdDev() {
|
public double getDeferredRpcProcessingStdDev() {
|
||||||
return deferredRpcProcessingTime.lastStat().stddev();
|
return deferredRpcProcessingTime.lastStat().stddev();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public MetricsTag getTag(String tagName) {
|
||||||
|
return registry.getTag(tagName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -48,6 +48,7 @@
|
|||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.test.MetricsAsserts;
|
import org.apache.hadoop.test.MetricsAsserts;
|
||||||
import org.apache.hadoop.test.MockitoUtil;
|
import org.apache.hadoop.test.MockitoUtil;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
@ -90,8 +91,6 @@
|
|||||||
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotEquals;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.junit.Assert.assertNotSame;
|
import static org.junit.Assert.assertNotSame;
|
||||||
import static org.junit.Assert.assertSame;
|
import static org.junit.Assert.assertSame;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
@ -447,6 +446,15 @@ private void testCallsInternal(Configuration myConf) throws Exception {
|
|||||||
assertCounterGt("SentBytes", 0L, rb);
|
assertCounterGt("SentBytes", 0L, rb);
|
||||||
assertCounterGt("ReceivedBytes", 0L, rb);
|
assertCounterGt("ReceivedBytes", 0L, rb);
|
||||||
|
|
||||||
|
// Check tags of the metrics
|
||||||
|
assertEquals("" + server.getPort(),
|
||||||
|
server.getRpcMetrics().getTag("port").value());
|
||||||
|
|
||||||
|
assertEquals("TestProtobufRpcProto",
|
||||||
|
server.getRpcMetrics().getTag("serverName").value());
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// Number of calls to echo method should be 2
|
// Number of calls to echo method should be 2
|
||||||
rb = getMetrics(server.rpcDetailedMetrics.name());
|
rb = getMetrics(server.rpcDetailedMetrics.name());
|
||||||
assertCounter("EchoNumOps", 2L, rb);
|
assertCounter("EchoNumOps", 2L, rb);
|
||||||
@ -1362,6 +1370,50 @@ public void testClientRpcTimeout() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServerNameFromClass() {
|
||||||
|
Assert.assertEquals("TestRPC",
|
||||||
|
RPC.Server.serverNameFromClass(this.getClass()));
|
||||||
|
Assert.assertEquals("TestClass",
|
||||||
|
RPC.Server.serverNameFromClass(TestRPC.TestClass.class));
|
||||||
|
|
||||||
|
Object testing = new TestClass().classFactory();
|
||||||
|
Assert.assertEquals("Embedded",
|
||||||
|
RPC.Server.serverNameFromClass(testing.getClass()));
|
||||||
|
|
||||||
|
testing = new TestClass().classFactoryAbstract();
|
||||||
|
Assert.assertEquals("TestClass",
|
||||||
|
RPC.Server.serverNameFromClass(testing.getClass()));
|
||||||
|
|
||||||
|
testing = new TestClass().classFactoryObject();
|
||||||
|
Assert.assertEquals("TestClass",
|
||||||
|
RPC.Server.serverNameFromClass(testing.getClass()));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
static class TestClass {
|
||||||
|
class Embedded {
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract class AbstractEmbedded {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private Object classFactory() {
|
||||||
|
return new Embedded();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Object classFactoryAbstract() {
|
||||||
|
return new AbstractEmbedded() {
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private Object classFactoryObject() {
|
||||||
|
return new Object() {
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
public static class FakeRequestClass extends RpcWritable {
|
public static class FakeRequestClass extends RpcWritable {
|
||||||
static volatile IOException exception;
|
static volatile IOException exception;
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
Reference in New Issue
Block a user