diff --git a/common/CHANGES.txt b/common/CHANGES.txt index c6dcc363de..5ff0d025a0 100644 --- a/common/CHANGES.txt +++ b/common/CHANGES.txt @@ -12,6 +12,8 @@ Trunk (unreleased changes) NEW FEATURES + HADOOP-7324. Ganglia plugins for metrics v2. (Priyo Mustafi via llu) + HADOOP-7342. Add an utility API in FileUtil for JDK File.list avoid NPEs on File.list() (Bharath Mundlapudi via mattf) diff --git a/common/conf/hadoop-metrics2.properties b/common/conf/hadoop-metrics2.properties index 7365f6de1f..a6e2f0b16f 100644 --- a/common/conf/hadoop-metrics2.properties +++ b/common/conf/hadoop-metrics2.properties @@ -25,3 +25,33 @@ #reducetask.sink.file.filename=reducetask-metrics.out + +# +# Below are for sending metrics to Ganglia +# +# for Ganglia 3.0 support +# *.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink30 +# +# for Ganglia 3.1 support +# *.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31 + +# *.sink.ganglia.period=10 + +# default for supportsparse is false +# *.sink.ganglia.supportsparse=true + +#*.sink.ganglia.slope=jvm.metrics.gcCount=zero,jvm.metrics.memHeapUsedM=both +#*.sink.ganglia.dmax=jvm.metrics.threadsBlocked=70,jvm.metrics.memHeapUsedM=40 + +#namenode.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649 + +#datanode.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649 + +#jobtracker.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649 + +#tasktracker.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649 + +#maptask.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649 + +#reducetask.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649 + diff --git a/common/src/java/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java b/common/src/java/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java new file mode 100644 index 0000000000..afea7dc555 --- /dev/null +++ b/common/src/java/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.ganglia; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.configuration.SubsetConfiguration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.MetricsSink; +import org.apache.hadoop.metrics2.util.Servers; +import org.apache.hadoop.net.DNS; + +/** + * This the base class for Ganglia sink classes using metrics2. Lot of the code + * has been derived from org.apache.hadoop.metrics.ganglia.GangliaContext. + * As per the documentation, sink implementations doesn't have to worry about + * thread safety. Hence the code wasn't written for thread safety and should + * be modified in case the above assumption changes in the future. + */ +public abstract class AbstractGangliaSink implements MetricsSink { + + public final Log LOG = LogFactory.getLog(this.getClass()); + + /* + * Output of "gmetric --help" showing allowable values + * -t, --type=STRING + * Either string|int8|uint8|int16|uint16|int32|uint32|float|double + * -u, --units=STRING Unit of measure for the value e.g. Kilobytes, Celcius + * (default='') + * -s, --slope=STRING Either zero|positive|negative|both + * (default='both') + * -x, --tmax=INT The maximum time in seconds between gmetric calls + * (default='60') + */ + public static final String DEFAULT_UNITS = ""; + public static final int DEFAULT_TMAX = 60; + public static final int DEFAULT_DMAX = 0; + public static final GangliaSlope DEFAULT_SLOPE = GangliaSlope.both; + public static final int DEFAULT_PORT = 8649; + public static final String SERVERS_PROPERTY = "servers"; + public static final int BUFFER_SIZE = 1500; // as per libgmond.c + public static final String SUPPORT_SPARSE_METRICS_PROPERTY = "supportsparse"; + public static final boolean SUPPORT_SPARSE_METRICS_DEFAULT = false; + public static final String EQUAL = "="; + + private String hostName = "UNKNOWN.example.com"; + private DatagramSocket datagramSocket; + private List metricsServers; + private byte[] buffer = new byte[BUFFER_SIZE]; + private int offset; + private boolean supportSparseMetrics = SUPPORT_SPARSE_METRICS_DEFAULT; + + /** + * Used for visiting Metrics + */ + protected final GangliaMetricVisitor gangliaMetricVisitor = + new GangliaMetricVisitor(); + + private SubsetConfiguration conf; + private Map gangliaConfMap; + private GangliaConf DEFAULT_GANGLIA_CONF = new GangliaConf(); + + /** + * ganglia slope values which equal the ordinal + */ + public enum GangliaSlope { + zero, // 0 + positive, // 1 + negative, // 2 + both // 3 + }; + + /** + * define enum for various type of conf + */ + public enum GangliaConfType { + slope, units, dmax, tmax + }; + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.metrics2.MetricsPlugin#init(org.apache.commons.configuration + * .SubsetConfiguration) + */ + public void init(SubsetConfiguration conf) { + LOG.debug("Initializing the GangliaSink for Ganglia metrics."); + + this.conf = conf; + + // Take the hostname from the DNS class. + if (conf.getString("slave.host.name") != null) { + hostName = conf.getString("slave.host.name"); + } else { + try { + hostName = DNS.getDefaultHost( + conf.getString("dfs.datanode.dns.interface", "default"), + conf.getString("dfs.datanode.dns.nameserver", "default")); + } catch (UnknownHostException uhe) { + LOG.error(uhe); + hostName = "UNKNOWN.example.com"; + } + } + + // load the gannglia servers from properties + metricsServers = Servers.parse(conf.getString(SERVERS_PROPERTY), + DEFAULT_PORT); + + // extract the Ganglia conf per metrics + gangliaConfMap = new HashMap(); + loadGangliaConf(GangliaConfType.units); + loadGangliaConf(GangliaConfType.tmax); + loadGangliaConf(GangliaConfType.dmax); + loadGangliaConf(GangliaConfType.slope); + + try { + datagramSocket = new DatagramSocket(); + } catch (SocketException se) { + LOG.error(se); + } + + // see if sparseMetrics is supported. Default is false + supportSparseMetrics = conf.getBoolean(SUPPORT_SPARSE_METRICS_PROPERTY, + SUPPORT_SPARSE_METRICS_DEFAULT); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.metrics2.MetricsSink#flush() + */ + public void flush() { + // nothing to do as we are not buffering data + } + + // Load the configurations for a conf type + private void loadGangliaConf(GangliaConfType gtype) { + String propertyarr[] = conf.getStringArray(gtype.name()); + if (propertyarr != null && propertyarr.length > 0) { + for (String metricNValue : propertyarr) { + String metricNValueArr[] = metricNValue.split(EQUAL); + if (metricNValueArr.length != 2 || metricNValueArr[0].length() == 0) { + LOG.error("Invalid propertylist for " + gtype.name()); + } + + String metricName = metricNValueArr[0].trim(); + String metricValue = metricNValueArr[1].trim(); + GangliaConf gconf = gangliaConfMap.get(metricName); + if (gconf == null) { + gconf = new GangliaConf(); + gangliaConfMap.put(metricName, gconf); + } + + switch (gtype) { + case units: + gconf.setUnits(metricValue); + break; + case dmax: + gconf.setDmax(Integer.parseInt(metricValue)); + break; + case tmax: + gconf.setTmax(Integer.parseInt(metricValue)); + break; + case slope: + gconf.setSlope(GangliaSlope.valueOf(metricValue)); + break; + } + } + } + } + + /** + * Lookup GangliaConf from cache. If not found, return default values + * + * @param metricName + * @return looked up GangliaConf + */ + protected GangliaConf getGangliaConfForMetric(String metricName) { + GangliaConf gconf = gangliaConfMap.get(metricName); + + return gconf != null ? gconf : DEFAULT_GANGLIA_CONF; + } + + /** + * @return the hostName + */ + protected String getHostName() { + return hostName; + } + + /** + * Puts a string into the buffer by first writing the size of the string as an + * int, followed by the bytes of the string, padded if necessary to a multiple + * of 4. + * @param s the string to be written to buffer at offset location + */ + protected void xdr_string(String s) { + byte[] bytes = s.getBytes(); + int len = bytes.length; + xdr_int(len); + System.arraycopy(bytes, 0, buffer, offset, len); + offset += len; + pad(); + } + + // Pads the buffer with zero bytes up to the nearest multiple of 4. + private void pad() { + int newOffset = ((offset + 3) / 4) * 4; + while (offset < newOffset) { + buffer[offset++] = 0; + } + } + + /** + * Puts an integer into the buffer as 4 bytes, big-endian. + */ + protected void xdr_int(int i) { + buffer[offset++] = (byte) ((i >> 24) & 0xff); + buffer[offset++] = (byte) ((i >> 16) & 0xff); + buffer[offset++] = (byte) ((i >> 8) & 0xff); + buffer[offset++] = (byte) (i & 0xff); + } + + /** + * Sends Ganglia Metrics to the configured hosts + * @throws IOException + */ + protected void emitToGangliaHosts() throws IOException { + try { + for (SocketAddress socketAddress : metricsServers) { + DatagramPacket packet = + new DatagramPacket(buffer, offset, socketAddress); + datagramSocket.send(packet); + } + } finally { + // reset the buffer for the next metric to be built + offset = 0; + } + } + + /** + * Reset the buffer for the next metric to be built + */ + void resetBuffer() { + offset = 0; + } + + /** + * @return whether sparse metrics are supported + */ + protected boolean isSupportSparseMetrics() { + return supportSparseMetrics; + } + + /** + * Used only by unit test + * @param datagramSocket the datagramSocket to set. + */ + void setDatagramSocket(DatagramSocket datagramSocket) { + this.datagramSocket = datagramSocket; + } +} diff --git a/common/src/java/org/apache/hadoop/metrics2/sink/ganglia/GangliaConf.java b/common/src/java/org/apache/hadoop/metrics2/sink/ganglia/GangliaConf.java new file mode 100644 index 0000000000..b75e409bbb --- /dev/null +++ b/common/src/java/org/apache/hadoop/metrics2/sink/ganglia/GangliaConf.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.ganglia; + +import org.apache.hadoop.metrics2.sink.ganglia.AbstractGangliaSink.GangliaSlope; + +/** + * class which is used to store ganglia properties + */ +class GangliaConf { + private String units = AbstractGangliaSink.DEFAULT_UNITS; + private GangliaSlope slope; + private int dmax = AbstractGangliaSink.DEFAULT_DMAX; + private int tmax = AbstractGangliaSink.DEFAULT_TMAX; + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append("unit=").append(units).append(", slope=").append(slope) + .append(", dmax=").append(dmax).append(", tmax=").append(tmax); + return buf.toString(); + } + + /** + * @return the units + */ + String getUnits() { + return units; + } + + /** + * @param units the units to set + */ + void setUnits(String units) { + this.units = units; + } + + /** + * @return the slope + */ + GangliaSlope getSlope() { + return slope; + } + + /** + * @param slope the slope to set + */ + void setSlope(GangliaSlope slope) { + this.slope = slope; + } + + /** + * @return the dmax + */ + int getDmax() { + return dmax; + } + + /** + * @param dmax the dmax to set + */ + void setDmax(int dmax) { + this.dmax = dmax; + } + + /** + * @return the tmax + */ + int getTmax() { + return tmax; + } + + /** + * @param tmax the tmax to set + */ + void setTmax(int tmax) { + this.tmax = tmax; + } +} \ No newline at end of file diff --git a/common/src/java/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricVisitor.java b/common/src/java/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricVisitor.java new file mode 100644 index 0000000000..f478191104 --- /dev/null +++ b/common/src/java/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricVisitor.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.ganglia; + +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsVisitor; +import org.apache.hadoop.metrics2.sink.ganglia.AbstractGangliaSink.GangliaSlope; + +/** + * Since implementations of Metric are not public, hence use a visitor to figure + * out the type and slope of the metric. Counters have "positive" slope. + */ +class GangliaMetricVisitor implements MetricsVisitor { + private static final String INT32 = "int32"; + private static final String FLOAT = "float"; + private static final String DOUBLE = "double"; + + private String type; + private GangliaSlope slope; + + /** + * @return the type of a visited metric + */ + String getType() { + return type; + } + + /** + * @return the slope of a visited metric. Slope is positive for counters and + * null for others + */ + GangliaSlope getSlope() { + return slope; + } + + @Override + public void gauge(MetricsInfo info, int value) { + // MetricGaugeInt.class ==> "int32" + type = INT32; + slope = null; // set to null as cannot figure out from Metric + } + + @Override + public void gauge(MetricsInfo info, long value) { + // MetricGaugeLong.class ==> "float" + type = FLOAT; + slope = null; // set to null as cannot figure out from Metric + } + + @Override + public void gauge(MetricsInfo info, float value) { + // MetricGaugeFloat.class ==> "float" + type = FLOAT; + slope = null; // set to null as cannot figure out from Metric + } + + @Override + public void gauge(MetricsInfo info, double value) { + // MetricGaugeDouble.class ==> "double" + type = DOUBLE; + slope = null; // set to null as cannot figure out from Metric + } + + @Override + public void counter(MetricsInfo info, int value) { + // MetricCounterInt.class ==> "int32" + type = INT32; + // counters have positive slope + slope = GangliaSlope.positive; + } + + @Override + public void counter(MetricsInfo info, long value) { + // MetricCounterLong.class ==> "float" + type = FLOAT; + // counters have positive slope + slope = GangliaSlope.positive; + } +} diff --git a/common/src/java/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink30.java b/common/src/java/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink30.java new file mode 100644 index 0000000000..8d90101f2c --- /dev/null +++ b/common/src/java/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink30.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.ganglia; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsException; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.util.MetricsCache; +import org.apache.hadoop.metrics2.util.MetricsCache.Record; + +/** + * This code supports Ganglia 3.0 + * + */ +public class GangliaSink30 extends AbstractGangliaSink { + + public final Log LOG = LogFactory.getLog(this.getClass()); + + private MetricsCache metricsCache = new MetricsCache(); + + @Override + public void putMetrics(MetricsRecord record) { + // The method handles both cases whether Ganglia support dense publish + // of metrics of sparse (only on change) publish of metrics + try { + String recordName = record.name(); + String contextName = record.context(); + + StringBuilder sb = new StringBuilder(); + sb.append(contextName); + sb.append('.'); + sb.append(recordName); + + String groupName = sb.toString(); + sb.append('.'); + int sbBaseLen = sb.length(); + + String type = null; + GangliaSlope slopeFromMetric = null; + GangliaSlope calculatedSlope = null; + Record cachedMetrics = null; + resetBuffer(); // reset the buffer to the beginning + if (!isSupportSparseMetrics()) { + // for sending dense metrics, update metrics cache + // and get the updated data + cachedMetrics = metricsCache.update(record); + + if (cachedMetrics != null && cachedMetrics.metricsEntrySet() != null) { + for (Map.Entry entry : cachedMetrics + .metricsEntrySet()) { + AbstractMetric metric = entry.getValue(); + sb.append(metric.name()); + String name = sb.toString(); + + // visit the metric to identify the Ganglia type and + // slope + metric.visit(gangliaMetricVisitor); + type = gangliaMetricVisitor.getType(); + slopeFromMetric = gangliaMetricVisitor.getSlope(); + + GangliaConf gConf = getGangliaConfForMetric(name); + calculatedSlope = calculateSlope(gConf, slopeFromMetric); + + // send metric to Ganglia + emitMetric(groupName, name, type, metric.value().toString(), gConf, + calculatedSlope); + + // reset the length of the buffer for next iteration + sb.setLength(sbBaseLen); + } + } + } else { + // we support sparse updates + + Collection metrics = (Collection) record + .metrics(); + if (metrics.size() > 0) { + // we got metrics. so send the latest + for (AbstractMetric metric : record.metrics()) { + sb.append(metric.name()); + String name = sb.toString(); + + // visit the metric to identify the Ganglia type and + // slope + metric.visit(gangliaMetricVisitor); + type = gangliaMetricVisitor.getType(); + slopeFromMetric = gangliaMetricVisitor.getSlope(); + + GangliaConf gConf = getGangliaConfForMetric(name); + calculatedSlope = calculateSlope(gConf, slopeFromMetric); + + // send metric to Ganglia + emitMetric(groupName, name, type, metric.value().toString(), gConf, + calculatedSlope); + + // reset the length of the buffer for next iteration + sb.setLength(sbBaseLen); + } + } + } + } catch (IOException io) { + throw new MetricsException("Failed to putMetrics", io); + } + } + + // Calculate the slope from properties and metric + private GangliaSlope calculateSlope(GangliaConf gConf, + GangliaSlope slopeFromMetric) { + if (gConf.getSlope() != null) { + // if slope has been specified in properties, use that + return gConf.getSlope(); + } else if (slopeFromMetric != null) { + // slope not specified in properties, use derived from Metric + return slopeFromMetric; + } else { + return DEFAULT_SLOPE; + } + } + + /** + * The method sends metrics to Ganglia servers. The method has been taken from + * org.apache.hadoop.metrics.ganglia.GangliaContext30 with minimal changes in + * order to keep it in sync. + * @param groupName The group name of the metric + * @param name The metric name + * @param type The type of the metric + * @param value The value of the metric + * @param gConf The GangliaConf for this metric + * @param gSlope The slope for this metric + * @throws IOException + */ + protected void emitMetric(String groupName, String name, String type, + String value, GangliaConf gConf, GangliaSlope gSlope) throws IOException { + + if (name == null) { + LOG.warn("Metric was emitted with no name."); + return; + } else if (value == null) { + LOG.warn("Metric name " + name + " was emitted with a null value."); + return; + } else if (type == null) { + LOG.warn("Metric name " + name + ", value " + value + " has no type."); + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Emitting metric " + name + ", type " + type + ", value " + + value + ", slope " + gSlope.name() + " from hostname " + + getHostName()); + } + + xdr_int(0); // metric_user_defined + xdr_string(type); + xdr_string(name); + xdr_string(value); + xdr_string(gConf.getUnits()); + xdr_int(gSlope.ordinal()); + xdr_int(gConf.getTmax()); + xdr_int(gConf.getDmax()); + + // send the metric to Ganglia hosts + emitToGangliaHosts(); + } +} diff --git a/common/src/java/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink31.java b/common/src/java/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink31.java new file mode 100644 index 0000000000..c8315e8e14 --- /dev/null +++ b/common/src/java/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink31.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.ganglia; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * This code supports Ganglia 3.1 + * + */ +public class GangliaSink31 extends GangliaSink30 { + + public final Log LOG = LogFactory.getLog(this.getClass()); + + /** + * The method sends metrics to Ganglia servers. The method has been taken from + * org.apache.hadoop.metrics.ganglia.GangliaContext31 with minimal changes in + * order to keep it in sync. + * @param groupName The group name of the metric + * @param name The metric name + * @param type The type of the metric + * @param value The value of the metric + * @param gConf The GangliaConf for this metric + * @param gSlope The slope for this metric + * @throws IOException + */ + protected void emitMetric(String groupName, String name, String type, + String value, GangliaConf gConf, GangliaSlope gSlope) + throws IOException { + + if (name == null) { + LOG.warn("Metric was emitted with no name."); + return; + } else if (value == null) { + LOG.warn("Metric name " + name +" was emitted with a null value."); + return; + } else if (type == null) { + LOG.warn("Metric name " + name + ", value " + value + " has no type."); + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Emitting metric " + name + ", type " + type + ", value " + value + + ", slope " + gSlope.name()+ " from hostname " + getHostName()); + } + + // The following XDR recipe was done through a careful reading of + // gm_protocol.x in Ganglia 3.1 and carefully examining the output of + // the gmetric utility with strace. + + // First we send out a metadata message + xdr_int(128); // metric_id = metadata_msg + xdr_string(getHostName()); // hostname + xdr_string(name); // metric name + xdr_int(0); // spoof = False + xdr_string(type); // metric type + xdr_string(name); // metric name + xdr_string(gConf.getUnits()); // units + xdr_int(gSlope.ordinal()); // slope + xdr_int(gConf.getTmax()); // tmax, the maximum time between metrics + xdr_int(gConf.getDmax()); // dmax, the maximum data value + xdr_int(1); /*Num of the entries in extra_value field for + Ganglia 3.1.x*/ + xdr_string("GROUP"); /*Group attribute*/ + xdr_string(groupName); /*Group value*/ + + // send the metric to Ganglia hosts + emitToGangliaHosts(); + + // Now we send out a message with the actual value. + // Technically, we only need to send out the metadata message once for + // each metric, but I don't want to have to record which metrics we did and + // did not send. + xdr_int(133); // we are sending a string value + xdr_string(getHostName()); // hostName + xdr_string(name); // metric name + xdr_int(0); // spoof = False + xdr_string("%s"); // format field + xdr_string(value); // metric value + + // send the metric to Ganglia hosts + emitToGangliaHosts(); + } +} diff --git a/common/src/java/org/apache/hadoop/metrics2/util/MetricsCache.java b/common/src/java/org/apache/hadoop/metrics2/util/MetricsCache.java index e88d3e730b..efcb286fae 100644 --- a/common/src/java/org/apache/hadoop/metrics2/util/MetricsCache.java +++ b/common/src/java/org/apache/hadoop/metrics2/util/MetricsCache.java @@ -23,9 +23,6 @@ import java.util.Map; import java.util.Set; -import com.google.common.base.Objects; -import com.google.common.collect.Maps; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -34,6 +31,9 @@ import org.apache.hadoop.metrics2.MetricsRecord; import org.apache.hadoop.metrics2.MetricsTag; +import com.google.common.base.Objects; +import com.google.common.collect.Maps; + /** * A metrics cache for sinks that don't support sparse updates. */ @@ -68,7 +68,7 @@ protected boolean removeEldestEntry(Map.Entry, */ public static class Record { final Map tags = Maps.newHashMap(); - final Map metrics = Maps.newHashMap(); + final Map metrics = Maps.newHashMap(); /** * Lookup a tag value @@ -85,6 +85,16 @@ public String getTag(String key) { * @return the metric value */ public Number getMetric(String key) { + AbstractMetric metric = metrics.get(key); + return metric != null ? metric.value() : null; + } + + /** + * Lookup a metric instance + * @param key name of the metric + * @return the metric instance + */ + public AbstractMetric getMetricInstance(String key) { return metrics.get(key); } @@ -96,9 +106,23 @@ public Set> tags() { } /** - * @return entry set of the metrics of the record + * @deprecated use metricsEntrySet() instead + * @return entry set of metrics */ + @Deprecated public Set> metrics() { + Map map = new LinkedHashMap( + metrics.size()); + for (Map.Entry mapEntry : metrics.entrySet()) { + map.put(mapEntry.getKey(), mapEntry.getValue().value()); + } + return map.entrySet(); + } + + /** + * @return entry set of metrics + */ + public Set> metricsEntrySet() { return metrics.entrySet(); } @@ -141,7 +165,7 @@ record = new Record(); recordCache.put(tags, record); } for (AbstractMetric m : mr.metrics()) { - record.metrics.put(m.name(), m.value()); + record.metrics.put(m.name(), m); } if (includingTags) { // mostly for some sinks that include tags as part of a dense schema diff --git a/common/src/test/core/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java b/common/src/test/core/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java new file mode 100644 index 0000000000..9d78ba77bc --- /dev/null +++ b/common/src/test/core/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MutableRate; +import org.apache.hadoop.metrics2.sink.ganglia.AbstractGangliaSink; +import org.apache.hadoop.metrics2.sink.ganglia.GangliaSink30; +import org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31; +import org.apache.hadoop.metrics2.sink.ganglia.GangliaMetricsTestHelper; +import org.junit.Test; + +public class TestGangliaMetrics { + public static final Log LOG = LogFactory.getLog(TestMetricsSystemImpl.class); + private final String[] expectedMetrics = + { "test.s1rec.C1", + "test.s1rec.G1", + "test.s1rec.Xxx", + "test.s1rec.Yyy", + "test.s1rec.S1NumOps", + "test.s1rec.S1AvgTime" }; + + @Test public void testGangliaMetrics2() throws Exception { + ConfigBuilder cb = new ConfigBuilder().add("default.period", 10) + .add("test.sink.gsink30.context", "test") // filter out only "test" + .add("test.sink.gsink31.context", "test") // filter out only "test" + .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test")); + + MetricsSystemImpl ms = new MetricsSystemImpl("Test"); + ms.start(); + TestSource s1 = ms.register("s1", "s1 desc", new TestSource("s1rec")); + s1.c1.incr(); + s1.xxx.incr(); + s1.g1.set(2); + s1.yyy.incr(2); + s1.s1.add(0); + + final int expectedCountFromGanglia30 = expectedMetrics.length; + final int expectedCountFromGanglia31 = 2 * expectedMetrics.length; + + // use latch to make sure we received required records before shutting + // down the MetricSystem + CountDownLatch latch = new CountDownLatch( + expectedCountFromGanglia30 + expectedCountFromGanglia31); + + // Setup test for GangliaSink30 + AbstractGangliaSink gsink30 = new GangliaSink30(); + gsink30.init(cb.subset("test")); + MockDatagramSocket mockds30 = new MockDatagramSocket(latch); + GangliaMetricsTestHelper.setDatagramSocket(gsink30, mockds30); + + // Setup test for GangliaSink31 + AbstractGangliaSink gsink31 = new GangliaSink31(); + gsink31.init(cb.subset("test")); + MockDatagramSocket mockds31 = new MockDatagramSocket(latch); + GangliaMetricsTestHelper.setDatagramSocket(gsink31, mockds31); + + // register the sinks + ms.register("gsink30", "gsink30 desc", gsink30); + ms.register("gsink31", "gsink31 desc", gsink31); + ms.onTimerEvent(); // trigger something interesting + + // wait for all records and the stop MetricSystem. Without this + // sometime the ms gets shutdown before all the sinks have consumed + latch.await(200, TimeUnit.MILLISECONDS); + ms.stop(); + + // check GanfliaSink30 data + checkMetrics(mockds30.getCapturedSend(), expectedCountFromGanglia30); + + // check GanfliaSink31 data + checkMetrics(mockds31.getCapturedSend(), expectedCountFromGanglia31); + } + + + // check the expected against the actual metrics + private void checkMetrics(List bytearrlist, int expectedCount) { + boolean[] foundMetrics = new boolean[expectedMetrics.length]; + for (byte[] bytes : bytearrlist) { + String binaryStr = new String(bytes); + for (int index = 0; index < expectedMetrics.length; index++) { + if (binaryStr.indexOf(expectedMetrics[index]) >= 0) { + foundMetrics[index] = true; + break; + } + } + } + + for (int index = 0; index < foundMetrics.length; index++) { + if (!foundMetrics[index]) { + assertTrue("Missing metrics: " + expectedMetrics[index], false); + } + } + + assertEquals("Mismatch in record count: ", + expectedCount, bytearrlist.size()); + } + + @SuppressWarnings("unused") + @Metrics(context="test") + private static class TestSource { + @Metric("C1 desc") MutableCounterLong c1; + @Metric("XXX desc") MutableCounterLong xxx; + @Metric("G1 desc") MutableGaugeLong g1; + @Metric("YYY desc") MutableGaugeLong yyy; + @Metric MutableRate s1; + final MetricsRegistry registry; + + TestSource(String recName) { + registry = new MetricsRegistry(recName); + } + } + + /** + * This class is used to capture data send to Ganglia servers. + * + * Initial attempt was to use mockito to mock and capture but + * while testing figured out that mockito is keeping the reference + * to the byte array and since the sink code reuses the byte array + * hence all the captured byte arrays were pointing to one instance. + */ + private class MockDatagramSocket extends DatagramSocket { + private ArrayList capture; + private CountDownLatch latch; + + /** + * @throws SocketException + */ + public MockDatagramSocket() throws SocketException { + capture = new ArrayList(); + } + + /** + * @param latch + * @throws SocketException + */ + public MockDatagramSocket(CountDownLatch latch) throws SocketException { + this(); + this.latch = latch; + } + + /* (non-Javadoc) + * @see java.net.DatagramSocket#send(java.net.DatagramPacket) + */ + @Override + public void send(DatagramPacket p) throws IOException { + // capture the byte arrays + byte[] bytes = new byte[p.getLength()]; + System.arraycopy(p.getData(), p.getOffset(), bytes, 0, p.getLength()); + capture.add(bytes); + + // decrement the latch + latch.countDown(); + } + + /** + * @return the captured byte arrays + */ + ArrayList getCapturedSend() { + return capture; + } + } +} diff --git a/common/src/test/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricsTestHelper.java b/common/src/test/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricsTestHelper.java new file mode 100644 index 0000000000..ba7e1c12aa --- /dev/null +++ b/common/src/test/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricsTestHelper.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.ganglia; + +import java.net.DatagramSocket; + +/** + * Helper class in the same package as ganglia sinks to be used by unit tests + */ +public class GangliaMetricsTestHelper { + + /** + * Helper method to access package private method to set DatagramSocket + * needed for Unit test + * @param gangliaSink + * @param datagramSocket + */ + public static void setDatagramSocket(AbstractGangliaSink gangliaSink, + DatagramSocket datagramSocket) { + + gangliaSink.setDatagramSocket(datagramSocket); + } +} diff --git a/common/src/test/core/org/apache/hadoop/metrics2/util/TestMetricsCache.java b/common/src/test/core/org/apache/hadoop/metrics2/util/TestMetricsCache.java index 09cd5f9185..7bee3a2008 100644 --- a/common/src/test/core/org/apache/hadoop/metrics2/util/TestMetricsCache.java +++ b/common/src/test/core/org/apache/hadoop/metrics2/util/TestMetricsCache.java @@ -35,6 +35,7 @@ public class TestMetricsCache { private static final Log LOG = LogFactory.getLog(TestMetricsCache.class); + @SuppressWarnings("deprecation") @Test public void testUpdate() { MetricsCache cache = new MetricsCache(); MetricsRecord mr = makeRecord("r", @@ -54,25 +55,26 @@ public class TestMetricsCache { Arrays.asList(makeMetric("m", 2), makeMetric("m2", 42))); cr = cache.update(mr2); assertEquals("contains 3 metric", 3, cr.metrics().size()); - assertEquals("updated metric value", 2, cr.getMetric("m")); - assertEquals("old metric value", 1, cr.getMetric("m1")); - assertEquals("new metric value", 42, cr.getMetric("m2")); + checkMetricValue("updated metric value", cr, "m", 2); + checkMetricValue("old metric value", cr, "m1", 1); + checkMetricValue("new metric value", cr, "m2", 42); MetricsRecord mr3 = makeRecord("r", Arrays.asList(makeTag("t", "tv3")), // different tag value Arrays.asList(makeMetric("m3", 3))); cr = cache.update(mr3); // should get a new record assertEquals("contains 1 metric", 1, cr.metrics().size()); - assertEquals("updated metric value", 3, cr.getMetric("m3")); + checkMetricValue("updated metric value", cr, "m3", 3); // tags cache should be empty so far assertEquals("no tags", 0, cr.tags().size()); // until now cr = cache.update(mr3, true); assertEquals("Got 1 tag", 1, cr.tags().size()); assertEquals("Tag value", "tv3", cr.getTag("t")); - assertEquals("Metric value", 3, cr.getMetric("m3")); + checkMetricValue("Metric value", cr, "m3", 3); } + @SuppressWarnings("deprecation") @Test public void testGet() { MetricsCache cache = new MetricsCache(); assertNull("empty", cache.get("r", Arrays.asList(makeTag("t", "t")))); @@ -85,7 +87,7 @@ public class TestMetricsCache { assertNotNull("Got record", cr); assertEquals("contains 1 metric", 1, cr.metrics().size()); - assertEquals("new metric value", 1, cr.getMetric("m")); + checkMetricValue("new metric value", cr, "m", 1); } /** @@ -109,7 +111,7 @@ public class TestMetricsCache { cr = cache.update(makeRecord("r", Arrays.asList(makeTag("t"+ i, ""+ i)), Arrays.asList(makeMetric("m", i)))); - assertEquals("new metrics value", i, cr.getMetric("m")); + checkMetricValue("new metric value", cr, "m", i); if (i < MetricsCache.MAX_RECS_PER_NAME_DEFAULT) { assertNotNull("t0 is still there", cache.get("r", t0)); } @@ -117,6 +119,13 @@ public class TestMetricsCache { assertNull("t0 is gone", cache.get("r", t0)); } + private void checkMetricValue(String description, MetricsCache.Record cr, + String key, Number val) { + assertEquals(description, val, cr.getMetric(key)); + assertNotNull("metric not null", cr.getMetricInstance(key)); + assertEquals(description, val, cr.getMetricInstance(key).value()); + } + private MetricsRecord makeRecord(String name, Collection tags, Collection metrics) { MetricsRecord mr = mock(MetricsRecord.class);