From 6b9fb8c78b4c18211145deeac7abc5fc0af5d185 Mon Sep 17 00:00:00 2001 From: Sanjay Radia Date: Mon, 1 Mar 2010 21:36:23 +0000 Subject: [PATCH] HADOOP-6599 Split existing RpcMetrics into RpcMetrics & RpcDetailedMetrics. (Suresh Srinivas via Sanjay Radia) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@917737 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 4 + src/java/org/apache/hadoop/ipc/Server.java | 38 ++++++--- .../apache/hadoop/ipc/WritableRpcEngine.java | 6 +- .../hadoop/ipc/metrics/RpcActivityMBean.java | 8 +- .../ipc/metrics/RpcDetailedActivityMBean.java | 72 ++++++++++++++++ .../ipc/metrics/RpcDetailedMetrics.java | 82 +++++++++++++++++++ .../apache/hadoop/ipc/metrics/RpcMetrics.java | 32 +++++--- .../core/org/apache/hadoop/ipc/TestRPC.java | 24 +++++- 8 files changed, 233 insertions(+), 33 deletions(-) create mode 100644 src/java/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java create mode 100644 src/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java diff --git a/CHANGES.txt b/CHANGES.txt index dd066ba8ff..80d1cf0bd9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ Hadoop Change Log +# Add directory level at the storage directory Trunk (unreleased changes) @@ -181,6 +182,9 @@ Trunk (unreleased changes) HADOOP-6589. Provide better error messages when RPC authentication fails. (Kan Zhang via omalley) + HADOOP-6599 Split existing RpcMetrics into RpcMetrics & RpcDetailedMetrics. + (Suresh Srinivas via Sanjay Radia) + OPTIMIZATIONS HADOOP-6467. Improve the performance on HarFileSystem.listStatus(..). diff --git a/src/java/org/apache/hadoop/ipc/Server.java b/src/java/org/apache/hadoop/ipc/Server.java index 1b0e53b444..206f0b2431 100644 --- a/src/java/org/apache/hadoop/ipc/Server.java +++ b/src/java/org/apache/hadoop/ipc/Server.java @@ -64,6 +64,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics; import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SaslRpcServer; @@ -172,7 +173,8 @@ public static String getRemoteAddress() { // connections to nuke //during a cleanup - protected RpcMetrics rpcMetrics; + protected RpcMetrics rpcMetrics; + protected RpcDetailedMetrics rpcDetailedMetrics; private Configuration conf; private SecretManager secretManager; @@ -1268,8 +1270,9 @@ public Writable run() throws Exception { // its own message ordering. setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error); - // Discard the large buf and reset it back to - // smaller size to freeup heap + + // Discard the large buf and reset it back to smaller size + // to free up heap if (buf.size() > maxRespSize) { LOG.warn("Large response size " + buf.size() + " for call " + call.toString()); @@ -1336,6 +1339,8 @@ protected Server(String bindAddress, int port, this.port = listener.getAddress().getPort(); this.rpcMetrics = new RpcMetrics(serverName, Integer.toString(this.port), this); + this.rpcDetailedMetrics = new RpcDetailedMetrics(serverName, + Integer.toString(this.port)); this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false); @@ -1450,6 +1455,9 @@ public synchronized void stop() { if (this.rpcMetrics != null) { this.rpcMetrics.shutdown(); } + if (this.rpcDetailedMetrics != null) { + this.rpcDetailedMetrics.shutdown(); + } } /** Wait for the server to be stopped. @@ -1540,11 +1548,15 @@ public int getCallQueueLen() { * * @see WritableByteChannel#write(ByteBuffer) */ - private static int channelWrite(WritableByteChannel channel, - ByteBuffer buffer) throws IOException { + private int channelWrite(WritableByteChannel channel, + ByteBuffer buffer) throws IOException { - return (buffer.remaining() <= NIO_BUFFER_LIMIT) ? - channel.write(buffer) : channelIO(null, channel, buffer); + int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? + channel.write(buffer) : channelIO(null, channel, buffer); + if (count > 0) { + rpcMetrics.sentBytes.inc(count); + } + return count; } @@ -1556,11 +1568,15 @@ private static int channelWrite(WritableByteChannel channel, * * @see ReadableByteChannel#read(ByteBuffer) */ - private static int channelRead(ReadableByteChannel channel, - ByteBuffer buffer) throws IOException { + private int channelRead(ReadableByteChannel channel, + ByteBuffer buffer) throws IOException { - return (buffer.remaining() <= NIO_BUFFER_LIMIT) ? - channel.read(buffer) : channelIO(channel, null, buffer); + int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? + channel.read(buffer) : channelIO(channel, null, buffer); + if (count > 0) { + rpcMetrics.receivedBytes.inc(count); + } + return count; } /** diff --git a/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java index b95da2d9b6..ae34f1014c 100644 --- a/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -351,15 +351,15 @@ public Writable call(Class protocol, Writable param, long receivedTime) rpcMetrics.rpcProcessingTime.inc(processingTime); MetricsTimeVaryingRate m = - (MetricsTimeVaryingRate) rpcMetrics.registry.get(call.getMethodName()); + (MetricsTimeVaryingRate) rpcDetailedMetrics.registry.get(call.getMethodName()); if (m == null) { try { m = new MetricsTimeVaryingRate(call.getMethodName(), - rpcMetrics.registry); + rpcDetailedMetrics.registry); } catch (IllegalArgumentException iae) { // the metrics has been registered; re-fetch the handle LOG.info("Error register " + call.getMethodName(), iae); - m = (MetricsTimeVaryingRate) rpcMetrics.registry.get( + m = (MetricsTimeVaryingRate) rpcDetailedMetrics.registry.get( call.getMethodName()); } } diff --git a/src/java/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java b/src/java/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java index e2b33b7874..3646566069 100644 --- a/src/java/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java +++ b/src/java/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java @@ -55,7 +55,7 @@ */ public class RpcActivityMBean extends MetricsDynamicMBeanBase { - final private ObjectName mbeanName; + private final ObjectName mbeanName; /** * @@ -63,9 +63,8 @@ public class RpcActivityMBean extends MetricsDynamicMBeanBase { * @param serviceName - the service name for the rpc service * @param port - the rpc port. */ - public RpcActivityMBean(final MetricsRegistry mr, final String serviceName, final String port) { - - + public RpcActivityMBean(final MetricsRegistry mr, final String serviceName, + final String port) { super(mr, "Rpc layer statistics"); mbeanName = MBeanUtil.registerMBean(serviceName, "RpcActivityForPort" + port, this); @@ -76,5 +75,4 @@ public void shutdown() { if (mbeanName != null) MBeanUtil.unregisterMBean(mbeanName); } - } diff --git a/src/java/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java b/src/java/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java new file mode 100644 index 0000000000..bf490ded58 --- /dev/null +++ b/src/java/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc.metrics; + +import javax.management.ObjectName; + +import org.apache.hadoop.metrics.util.MBeanUtil; +import org.apache.hadoop.metrics.util.MetricsDynamicMBeanBase; +import org.apache.hadoop.metrics.util.MetricsRegistry; + +/** + * + * This is the JMX MBean for reporting the RPC layer Activity. The MBean is + * register using the name + * "hadoop:service=,name=RpcDetailedActivityForPort" + * + * Many of the activity metrics are sampled and averaged on an interval which + * can be specified in the metrics config file. + *

+ * For the metrics that are sampled and averaged, one must specify a metrics + * context that does periodic update calls. Most metrics contexts do. The + * default Null metrics context however does NOT. So if you aren't using any + * other metrics context then you can turn on the viewing and averaging of + * sampled metrics by specifying the following two lines in the + * hadoop-meterics.properties file: + * + *

+ *        rpc.class=org.apache.hadoop.metrics.spi.NullContextWithUpdateThread
+ *        rpc.period=10
+ * 
+ *

+ * Note that the metrics are collected regardless of the context used. The + * context with the update thread is used to average the data periodically + * + * Impl details: We use a dynamic mbean that gets the list of the metrics from + * the metrics registry passed as an argument to the constructor + */ +public class RpcDetailedActivityMBean extends MetricsDynamicMBeanBase { + private final ObjectName mbeanName; + + /** + * @param mr - the metrics registry that has all the metrics + * @param serviceName - the service name for the rpc service + * @param port - the rpc port. + */ + public RpcDetailedActivityMBean(final MetricsRegistry mr, + final String serviceName, final String port) { + super(mr, "Rpc layer detailed statistics"); + mbeanName = MBeanUtil.registerMBean(serviceName, + "RpcDetailedActivityForPort" + port, this); + } + + public void shutdown() { + if (mbeanName != null) + MBeanUtil.unregisterMBean(mbeanName); + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java b/src/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java new file mode 100644 index 0000000000..4050b7b89b --- /dev/null +++ b/src/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc.metrics; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics.MetricsContext; +import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.metrics.Updater; +import org.apache.hadoop.metrics.util.MetricsBase; +import org.apache.hadoop.metrics.util.MetricsRegistry; +import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate; + +/** + * + * This class is for maintaining the various RPC method related statistics + * and publishing them through the metrics interfaces. + * This also registers the JMX MBean for RPC. + */ +public class RpcDetailedMetrics implements Updater { + public final MetricsRegistry registry = new MetricsRegistry(); + private final MetricsRecord metricsRecord; + private static final Log LOG = LogFactory.getLog(RpcDetailedMetrics.class); + RpcDetailedActivityMBean rpcMBean; + + /** + * Statically added metrics to expose at least one metrics, without + * which other dynamically added metrics are not exposed over JMX. + */ + final MetricsTimeVaryingRate getProtocolVersion = + new MetricsTimeVaryingRate("getProtocolVersion", registry); + + public RpcDetailedMetrics(final String hostName, final String port) { + MetricsContext context = MetricsUtil.getContext("rpc"); + metricsRecord = MetricsUtil.createRecord(context, "detailed-metrics"); + + metricsRecord.setTag("port", port); + + LOG.info("Initializing RPC Metrics with hostName=" + + hostName + ", port=" + port); + + context.registerUpdater(this); + + // Need to clean up the interface to RpcMgt - don't need both metrics and server params + rpcMBean = new RpcDetailedActivityMBean(registry, hostName, port); + } + + + /** + * Push the metrics to the monitoring subsystem on doUpdate() call. + */ + public void doUpdates(final MetricsContext context) { + + synchronized (this) { + for (MetricsBase m : registry.getMetricsList()) { + m.pushMetric(metricsRecord); + } + } + metricsRecord.update(); + } + + public void shutdown() { + if (rpcMBean != null) + rpcMBean.shutdown(); + } +} diff --git a/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index dd5d2af5c7..c6aec2f428 100644 --- a/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -28,6 +28,7 @@ import org.apache.hadoop.metrics.util.MetricsIntValue; import org.apache.hadoop.metrics.util.MetricsRegistry; import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt; +import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong; import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate; /** @@ -43,13 +44,14 @@ * */ public class RpcMetrics implements Updater { - public MetricsRegistry registry = new MetricsRegistry(); - private MetricsRecord metricsRecord; - private Server myServer; - private static Log LOG = LogFactory.getLog(RpcMetrics.class); + private final MetricsRegistry registry = new MetricsRegistry(); + private final MetricsRecord metricsRecord; + private final Server myServer; + private static final Log LOG = LogFactory.getLog(RpcMetrics.class); RpcActivityMBean rpcMBean; - public RpcMetrics(String hostName, String port, Server server) { + public RpcMetrics(final String hostName, final String port, + final Server server) { myServer = server; MetricsContext context = MetricsUtil.getContext("rpc"); metricsRecord = MetricsUtil.createRecord(context, "metrics"); @@ -72,26 +74,30 @@ public RpcMetrics(String hostName, String port, Server server) { * -they can also be read directly - e.g. JMX does this. */ - public MetricsTimeVaryingRate rpcQueueTime = + public final MetricsTimeVaryingLong receivedBytes = + new MetricsTimeVaryingLong("ReceivedBytes", registry); + public final MetricsTimeVaryingLong sentBytes = + new MetricsTimeVaryingLong("SentBytes", registry); + public final MetricsTimeVaryingRate rpcQueueTime = new MetricsTimeVaryingRate("RpcQueueTime", registry); public MetricsTimeVaryingRate rpcProcessingTime = new MetricsTimeVaryingRate("RpcProcessingTime", registry); - public MetricsIntValue numOpenConnections = + public final MetricsIntValue numOpenConnections = new MetricsIntValue("NumOpenConnections", registry); - public MetricsIntValue callQueueLen = + public final MetricsIntValue callQueueLen = new MetricsIntValue("callQueueLen", registry); - public MetricsTimeVaryingInt authenticationFailures = + public final MetricsTimeVaryingInt authenticationFailures = new MetricsTimeVaryingInt("rpcAuthenticationFailures", registry); - public MetricsTimeVaryingInt authenticationSuccesses = + public final MetricsTimeVaryingInt authenticationSuccesses = new MetricsTimeVaryingInt("rpcAuthenticationSuccesses", registry); - public MetricsTimeVaryingInt authorizationFailures = + public final MetricsTimeVaryingInt authorizationFailures = new MetricsTimeVaryingInt("rpcAuthorizationFailures", registry); - public MetricsTimeVaryingInt authorizationSuccesses = + public final MetricsTimeVaryingInt authorizationSuccesses = new MetricsTimeVaryingInt("rpcAuthorizationSuccesses", registry); /** * Push the metrics to the monitoring subsystem on doUpdate() call. */ - public void doUpdates(MetricsContext context) { + public void doUpdates(final MetricsContext context) { synchronized (this) { // ToFix - fix server to use the following two metrics directly so diff --git a/src/test/core/org/apache/hadoop/ipc/TestRPC.java b/src/test/core/org/apache/hadoop/ipc/TestRPC.java index 7f2e170893..b718fcc827 100644 --- a/src/test/core/org/apache/hadoop/ipc/TestRPC.java +++ b/src/test/core/org/apache/hadoop/ipc/TestRPC.java @@ -34,6 +34,9 @@ import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.metrics.spi.NullContext; +import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.PolicyProvider; @@ -251,7 +254,26 @@ public void testCalls(Configuration conf) throws Exception { stringResult = proxy.echo((String)null); assertEquals(stringResult, null); - + + // Check rpcMetrics + server.rpcMetrics.doUpdates(new NullContext()); + + // Number 4 includes getProtocolVersion() + assertEquals(4, server.rpcMetrics.rpcProcessingTime.getPreviousIntervalNumOps()); + assertTrue(server.rpcMetrics.sentBytes.getPreviousIntervalValue() > 0); + assertTrue(server.rpcMetrics.receivedBytes.getPreviousIntervalValue() > 0); + + // Number of calls to echo method should be 2 + server.rpcDetailedMetrics.doUpdates(new NullContext()); + MetricsTimeVaryingRate metrics = + (MetricsTimeVaryingRate)server.rpcDetailedMetrics.registry.get("echo"); + assertEquals(2, metrics.getPreviousIntervalNumOps()); + + // Number of calls to ping method should be 1 + metrics = + (MetricsTimeVaryingRate)server.rpcDetailedMetrics.registry.get("ping"); + assertEquals(1, metrics.getPreviousIntervalNumOps()); + String[] stringResults = proxy.echo(new String[]{"foo","bar"}); assertTrue(Arrays.equals(stringResults, new String[]{"foo","bar"}));