HADOOP-6599 Split existing RpcMetrics into RpcMetrics & RpcDetailedMetrics.

(Suresh Srinivas via Sanjay Radia)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@917737 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanjay Radia 2010-03-01 21:36:23 +00:00
parent b4e4ed2b14
commit 6b9fb8c78b
8 changed files with 233 additions and 33 deletions

View File

@ -1,4 +1,5 @@
Hadoop Change Log
# Add directory level at the storage directory
Trunk (unreleased changes)
@ -181,6 +182,9 @@ Trunk (unreleased changes)
HADOOP-6589. Provide better error messages when RPC authentication fails.
(Kan Zhang via omalley)
HADOOP-6599 Split existing RpcMetrics into RpcMetrics & RpcDetailedMetrics.
(Suresh Srinivas via Sanjay Radia)
OPTIMIZATIONS
HADOOP-6467. Improve the performance on HarFileSystem.listStatus(..).

View File

@ -64,6 +64,7 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SaslRpcServer;
@ -172,7 +173,8 @@ public static String getRemoteAddress() {
// connections to nuke
//during a cleanup
protected RpcMetrics rpcMetrics;
protected RpcMetrics rpcMetrics;
protected RpcDetailedMetrics rpcDetailedMetrics;
private Configuration conf;
private SecretManager<TokenIdentifier> secretManager;
@ -1268,8 +1270,9 @@ public Writable run() throws Exception {
// its own message ordering.
setupResponse(buf, call, (error == null) ? Status.SUCCESS
: Status.ERROR, value, errorClass, error);
// Discard the large buf and reset it back to
// smaller size to freeup heap
// Discard the large buf and reset it back to smaller size
// to free up heap
if (buf.size() > maxRespSize) {
LOG.warn("Large response size " + buf.size() + " for call "
+ call.toString());
@ -1336,6 +1339,8 @@ protected Server(String bindAddress, int port,
this.port = listener.getAddress().getPort();
this.rpcMetrics = new RpcMetrics(serverName,
Integer.toString(this.port), this);
this.rpcDetailedMetrics = new RpcDetailedMetrics(serverName,
Integer.toString(this.port));
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
@ -1450,6 +1455,9 @@ public synchronized void stop() {
if (this.rpcMetrics != null) {
this.rpcMetrics.shutdown();
}
if (this.rpcDetailedMetrics != null) {
this.rpcDetailedMetrics.shutdown();
}
}
/** Wait for the server to be stopped.
@ -1540,11 +1548,15 @@ public int getCallQueueLen() {
*
* @see WritableByteChannel#write(ByteBuffer)
*/
private static int channelWrite(WritableByteChannel channel,
ByteBuffer buffer) throws IOException {
private int channelWrite(WritableByteChannel channel,
ByteBuffer buffer) throws IOException {
return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.write(buffer) : channelIO(null, channel, buffer);
int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.write(buffer) : channelIO(null, channel, buffer);
if (count > 0) {
rpcMetrics.sentBytes.inc(count);
}
return count;
}
@ -1556,11 +1568,15 @@ private static int channelWrite(WritableByteChannel channel,
*
* @see ReadableByteChannel#read(ByteBuffer)
*/
private static int channelRead(ReadableByteChannel channel,
ByteBuffer buffer) throws IOException {
private int channelRead(ReadableByteChannel channel,
ByteBuffer buffer) throws IOException {
return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.read(buffer) : channelIO(channel, null, buffer);
int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.read(buffer) : channelIO(channel, null, buffer);
if (count > 0) {
rpcMetrics.receivedBytes.inc(count);
}
return count;
}
/**

View File

@ -351,15 +351,15 @@ public Writable call(Class<?> protocol, Writable param, long receivedTime)
rpcMetrics.rpcProcessingTime.inc(processingTime);
MetricsTimeVaryingRate m =
(MetricsTimeVaryingRate) rpcMetrics.registry.get(call.getMethodName());
(MetricsTimeVaryingRate) rpcDetailedMetrics.registry.get(call.getMethodName());
if (m == null) {
try {
m = new MetricsTimeVaryingRate(call.getMethodName(),
rpcMetrics.registry);
rpcDetailedMetrics.registry);
} catch (IllegalArgumentException iae) {
// the metrics has been registered; re-fetch the handle
LOG.info("Error register " + call.getMethodName(), iae);
m = (MetricsTimeVaryingRate) rpcMetrics.registry.get(
m = (MetricsTimeVaryingRate) rpcDetailedMetrics.registry.get(
call.getMethodName());
}
}

View File

@ -55,7 +55,7 @@
*/
public class RpcActivityMBean extends MetricsDynamicMBeanBase {
final private ObjectName mbeanName;
private final ObjectName mbeanName;
/**
*
@ -63,9 +63,8 @@ public class RpcActivityMBean extends MetricsDynamicMBeanBase {
* @param serviceName - the service name for the rpc service
* @param port - the rpc port.
*/
public RpcActivityMBean(final MetricsRegistry mr, final String serviceName, final String port) {
public RpcActivityMBean(final MetricsRegistry mr, final String serviceName,
final String port) {
super(mr, "Rpc layer statistics");
mbeanName = MBeanUtil.registerMBean(serviceName,
"RpcActivityForPort" + port, this);
@ -76,5 +75,4 @@ public void shutdown() {
if (mbeanName != null)
MBeanUtil.unregisterMBean(mbeanName);
}
}

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc.metrics;
import javax.management.ObjectName;
import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.metrics.util.MetricsDynamicMBeanBase;
import org.apache.hadoop.metrics.util.MetricsRegistry;
/**
*
* This is the JMX MBean for reporting the RPC layer Activity. The MBean is
* register using the name
* "hadoop:service=<RpcServiceName>,name=RpcDetailedActivityForPort<port>"
*
* Many of the activity metrics are sampled and averaged on an interval which
* can be specified in the metrics config file.
* <p>
* For the metrics that are sampled and averaged, one must specify a metrics
* context that does periodic update calls. Most metrics contexts do. The
* default Null metrics context however does NOT. So if you aren't using any
* other metrics context then you can turn on the viewing and averaging of
* sampled metrics by specifying the following two lines in the
* hadoop-meterics.properties file:
*
* <pre>
* rpc.class=org.apache.hadoop.metrics.spi.NullContextWithUpdateThread
* rpc.period=10
* </pre>
*<p>
* Note that the metrics are collected regardless of the context used. The
* context with the update thread is used to average the data periodically
*
* Impl details: We use a dynamic mbean that gets the list of the metrics from
* the metrics registry passed as an argument to the constructor
*/
public class RpcDetailedActivityMBean extends MetricsDynamicMBeanBase {
private final ObjectName mbeanName;
/**
* @param mr - the metrics registry that has all the metrics
* @param serviceName - the service name for the rpc service
* @param port - the rpc port.
*/
public RpcDetailedActivityMBean(final MetricsRegistry mr,
final String serviceName, final String port) {
super(mr, "Rpc layer detailed statistics");
mbeanName = MBeanUtil.registerMBean(serviceName,
"RpcDetailedActivityForPort" + port, this);
}
public void shutdown() {
if (mbeanName != null)
MBeanUtil.unregisterMBean(mbeanName);
}
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc.metrics;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.metrics.util.MetricsBase;
import org.apache.hadoop.metrics.util.MetricsRegistry;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
/**
*
* This class is for maintaining the various RPC method related statistics
* and publishing them through the metrics interfaces.
* This also registers the JMX MBean for RPC.
*/
public class RpcDetailedMetrics implements Updater {
public final MetricsRegistry registry = new MetricsRegistry();
private final MetricsRecord metricsRecord;
private static final Log LOG = LogFactory.getLog(RpcDetailedMetrics.class);
RpcDetailedActivityMBean rpcMBean;
/**
* Statically added metrics to expose at least one metrics, without
* which other dynamically added metrics are not exposed over JMX.
*/
final MetricsTimeVaryingRate getProtocolVersion =
new MetricsTimeVaryingRate("getProtocolVersion", registry);
public RpcDetailedMetrics(final String hostName, final String port) {
MetricsContext context = MetricsUtil.getContext("rpc");
metricsRecord = MetricsUtil.createRecord(context, "detailed-metrics");
metricsRecord.setTag("port", port);
LOG.info("Initializing RPC Metrics with hostName="
+ hostName + ", port=" + port);
context.registerUpdater(this);
// Need to clean up the interface to RpcMgt - don't need both metrics and server params
rpcMBean = new RpcDetailedActivityMBean(registry, hostName, port);
}
/**
* Push the metrics to the monitoring subsystem on doUpdate() call.
*/
public void doUpdates(final MetricsContext context) {
synchronized (this) {
for (MetricsBase m : registry.getMetricsList()) {
m.pushMetric(metricsRecord);
}
}
metricsRecord.update();
}
public void shutdown() {
if (rpcMBean != null)
rpcMBean.shutdown();
}
}

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.metrics.util.MetricsIntValue;
import org.apache.hadoop.metrics.util.MetricsRegistry;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
/**
@ -43,13 +44,14 @@
*
*/
public class RpcMetrics implements Updater {
public MetricsRegistry registry = new MetricsRegistry();
private MetricsRecord metricsRecord;
private Server myServer;
private static Log LOG = LogFactory.getLog(RpcMetrics.class);
private final MetricsRegistry registry = new MetricsRegistry();
private final MetricsRecord metricsRecord;
private final Server myServer;
private static final Log LOG = LogFactory.getLog(RpcMetrics.class);
RpcActivityMBean rpcMBean;
public RpcMetrics(String hostName, String port, Server server) {
public RpcMetrics(final String hostName, final String port,
final Server server) {
myServer = server;
MetricsContext context = MetricsUtil.getContext("rpc");
metricsRecord = MetricsUtil.createRecord(context, "metrics");
@ -72,26 +74,30 @@ public RpcMetrics(String hostName, String port, Server server) {
* -they can also be read directly - e.g. JMX does this.
*/
public MetricsTimeVaryingRate rpcQueueTime =
public final MetricsTimeVaryingLong receivedBytes =
new MetricsTimeVaryingLong("ReceivedBytes", registry);
public final MetricsTimeVaryingLong sentBytes =
new MetricsTimeVaryingLong("SentBytes", registry);
public final MetricsTimeVaryingRate rpcQueueTime =
new MetricsTimeVaryingRate("RpcQueueTime", registry);
public MetricsTimeVaryingRate rpcProcessingTime =
new MetricsTimeVaryingRate("RpcProcessingTime", registry);
public MetricsIntValue numOpenConnections =
public final MetricsIntValue numOpenConnections =
new MetricsIntValue("NumOpenConnections", registry);
public MetricsIntValue callQueueLen =
public final MetricsIntValue callQueueLen =
new MetricsIntValue("callQueueLen", registry);
public MetricsTimeVaryingInt authenticationFailures =
public final MetricsTimeVaryingInt authenticationFailures =
new MetricsTimeVaryingInt("rpcAuthenticationFailures", registry);
public MetricsTimeVaryingInt authenticationSuccesses =
public final MetricsTimeVaryingInt authenticationSuccesses =
new MetricsTimeVaryingInt("rpcAuthenticationSuccesses", registry);
public MetricsTimeVaryingInt authorizationFailures =
public final MetricsTimeVaryingInt authorizationFailures =
new MetricsTimeVaryingInt("rpcAuthorizationFailures", registry);
public MetricsTimeVaryingInt authorizationSuccesses =
public final MetricsTimeVaryingInt authorizationSuccesses =
new MetricsTimeVaryingInt("rpcAuthorizationSuccesses", registry);
/**
* Push the metrics to the monitoring subsystem on doUpdate() call.
*/
public void doUpdates(MetricsContext context) {
public void doUpdates(final MetricsContext context) {
synchronized (this) {
// ToFix - fix server to use the following two metrics directly so

View File

@ -34,6 +34,9 @@
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.spi.NullContext;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
@ -252,6 +255,25 @@ public void testCalls(Configuration conf) throws Exception {
stringResult = proxy.echo((String)null);
assertEquals(stringResult, null);
// Check rpcMetrics
server.rpcMetrics.doUpdates(new NullContext());
// Number 4 includes getProtocolVersion()
assertEquals(4, server.rpcMetrics.rpcProcessingTime.getPreviousIntervalNumOps());
assertTrue(server.rpcMetrics.sentBytes.getPreviousIntervalValue() > 0);
assertTrue(server.rpcMetrics.receivedBytes.getPreviousIntervalValue() > 0);
// Number of calls to echo method should be 2
server.rpcDetailedMetrics.doUpdates(new NullContext());
MetricsTimeVaryingRate metrics =
(MetricsTimeVaryingRate)server.rpcDetailedMetrics.registry.get("echo");
assertEquals(2, metrics.getPreviousIntervalNumOps());
// Number of calls to ping method should be 1
metrics =
(MetricsTimeVaryingRate)server.rpcDetailedMetrics.registry.get("ping");
assertEquals(1, metrics.getPreviousIntervalNumOps());
String[] stringResults = proxy.echo(new String[]{"foo","bar"});
assertTrue(Arrays.equals(stringResults, new String[]{"foo","bar"}));