From 8ab776d61e569c12ec62024415ff68e5d3b10141 Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Tue, 10 Apr 2018 11:42:54 -0700 Subject: [PATCH] HADOOP-15340. Provide meaningful RPC server name for RpcMetrics. Contributed by Elek Marton. --- .../apache/hadoop/ipc/ProtobufRpcEngine.java | 5 +- .../main/java/org/apache/hadoop/ipc/RPC.java | 46 +++++++++++++-- .../java/org/apache/hadoop/ipc/Server.java | 9 +++ .../apache/hadoop/ipc/WritableRpcEngine.java | 2 +- .../apache/hadoop/ipc/metrics/RpcMetrics.java | 11 +++- .../java/org/apache/hadoop/ipc/TestRPC.java | 56 ++++++++++++++++++- 6 files changed, 117 insertions(+), 12 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 639bbadffb..70fde60b6c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -419,8 +419,9 @@ public Server(Class protocolClass, Object protocolImpl, String portRangeConfig) throws IOException { super(bindAddress, port, null, numHandlers, - numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl - .getClass().getName()), secretManager, portRangeConfig); + numReaders, queueSizePerHandler, conf, + serverNameFromClass(protocolImpl.getClass()), secretManager, + portRangeConfig); this.verbose = verbose; registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass, protocolImpl); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 8f8eda6ded..9cfadc786c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -35,6 +35,8 @@ import java.util.Map; import java.util.HashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import javax.net.SocketFactory; @@ -808,13 +810,45 @@ public Server build() throws IOException, HadoopIllegalArgumentException { /** An RPC Server. */ public abstract static class Server extends org.apache.hadoop.ipc.Server { - boolean verbose; - static String classNameBase(String className) { - String[] names = className.split("\\.", -1); - if (names == null || names.length == 0) { - return className; + + boolean verbose; + + private static final Pattern COMPLEX_SERVER_NAME_PATTERN = + Pattern.compile("(?:[^\\$]*\\$)*([A-Za-z][^\\$]+)(?:\\$\\d+)?"); + + /** + * Get a meaningful and short name for a server based on a java class. + * + * The rules are defined to support the current naming schema of the + * generated protobuf classes where the final class usually an anonymous + * inner class of an inner class. + * + * 1. For simple classes it returns with the simple name of the classes + * (with the name without package name) + * + * 2. For inner classes, this is the simple name of the inner class. + * + * 3. If it is an Object created from a class factory + * E.g., org.apache.hadoop.ipc.TestRPC$TestClass$2 + * this method returns parent class TestClass. + * + * 4. If it is an anonymous class E.g., 'org.apache.hadoop.ipc.TestRPC$10' + * serverNameFromClass returns parent class TestRPC. + * + * + */ + static String serverNameFromClass(Class clazz) { + String name = clazz.getName(); + String[] names = clazz.getName().split("\\.", -1); + if (names != null && names.length > 0) { + name = names[names.length - 1]; + } + Matcher matcher = COMPLEX_SERVER_NAME_PATTERN.matcher(name); + if (matcher.find()) { + return matcher.group(1); + } else { + return name; } - return names[names.length-1]; } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index c5da3b1809..76d9c400b7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -140,6 +140,10 @@ public abstract class Server { private RpcSaslProto negotiateResponse; private ExceptionsHandler exceptionsHandler = new ExceptionsHandler(); private Tracer tracer; + /** + * Logical name of the server used in metrics and monitor. + */ + private final String serverName; /** * Add exception classes for which server won't log stack traces. @@ -2768,6 +2772,7 @@ protected Server(String bindAddress, int port, this.rpcRequestClass = rpcRequestClass; this.handlerCount = handlerCount; this.socketSendBufferSize = 0; + this.serverName = serverName; this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); if (queueSizePerHandler != -1) { @@ -3509,4 +3514,8 @@ public void run() { idleScanTimer.schedule(idleScanTask, idleScanInterval); } } + + public String getServerName() { + return serverName; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index f2b5862372..049793173f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -406,7 +406,7 @@ public Server(Class protocolClass, Object protocolImpl, throws IOException { super(bindAddress, port, null, numHandlers, numReaders, queueSizePerHandler, conf, - classNameBase(protocolImpl.getClass().getName()), secretManager, + serverNameFromClass(protocolImpl.getClass()), secretManager, portRangeConfig); this.verbose = verbose; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index d53d7d3fb5..a36bcd8648 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -17,10 +17,12 @@ */ package org.apache.hadoop.ipc.metrics; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsTag; import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -50,7 +52,9 @@ public class RpcMetrics { String port = String.valueOf(server.getListenerAddress().getPort()); name = "RpcActivityForPort" + port; this.server = server; - registry = new MetricsRegistry("rpc").tag("port", "RPC port", port); + registry = new MetricsRegistry("rpc") + .tag("port", "RPC port", port) + .tag("serverName", "Name of the RPC server", server.getServerName()); int[] intervals = conf.getInts( CommonConfigurationKeys.RPC_METRICS_PERCENTILES_INTERVALS_KEY); rpcQuantileEnable = (intervals.length > 0) && conf.getBoolean( @@ -292,4 +296,9 @@ public double getDeferredRpcProcessingMean() { public double getDeferredRpcProcessingStdDev() { return deferredRpcProcessingTime.lastStat().stddev(); } + + @VisibleForTesting + public MetricsTag getTag(String tagName) { + return registry.getTag(tagName); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index c6209d2483..b59664278d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -48,6 +48,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.MetricsAsserts; import org.apache.hadoop.test.MockitoUtil; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -90,8 +91,6 @@ import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -447,6 +446,15 @@ private void testCallsInternal(Configuration myConf) throws Exception { assertCounterGt("SentBytes", 0L, rb); assertCounterGt("ReceivedBytes", 0L, rb); + // Check tags of the metrics + assertEquals("" + server.getPort(), + server.getRpcMetrics().getTag("port").value()); + + assertEquals("TestProtobufRpcProto", + server.getRpcMetrics().getTag("serverName").value()); + + + // Number of calls to echo method should be 2 rb = getMetrics(server.rpcDetailedMetrics.name()); assertCounter("EchoNumOps", 2L, rb); @@ -1362,6 +1370,50 @@ public void testClientRpcTimeout() throws Exception { } } + @Test + public void testServerNameFromClass() { + Assert.assertEquals("TestRPC", + RPC.Server.serverNameFromClass(this.getClass())); + Assert.assertEquals("TestClass", + RPC.Server.serverNameFromClass(TestRPC.TestClass.class)); + + Object testing = new TestClass().classFactory(); + Assert.assertEquals("Embedded", + RPC.Server.serverNameFromClass(testing.getClass())); + + testing = new TestClass().classFactoryAbstract(); + Assert.assertEquals("TestClass", + RPC.Server.serverNameFromClass(testing.getClass())); + + testing = new TestClass().classFactoryObject(); + Assert.assertEquals("TestClass", + RPC.Server.serverNameFromClass(testing.getClass())); + + } + + static class TestClass { + class Embedded { + } + + abstract class AbstractEmbedded { + + } + + private Object classFactory() { + return new Embedded(); + } + + private Object classFactoryAbstract() { + return new AbstractEmbedded() { + }; + } + + private Object classFactoryObject() { + return new Object() { + }; + } + + } public static class FakeRequestClass extends RpcWritable { static volatile IOException exception; @Override