HADOOP-4675 Current Ganglia metrics implementation is incompatible with Ganglia 3.1 -- reversing this patch
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@810714 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
412efb84f2
commit
e5fef21b18
@ -515,9 +515,6 @@ Trunk (unreleased changes)
|
|||||||
HADOOP-6224. Add a method to WritableUtils performing a bounded read of an
|
HADOOP-6224. Add a method to WritableUtils performing a bounded read of an
|
||||||
encoded String. (Jothi Padmanabhan via cdouglas)
|
encoded String. (Jothi Padmanabhan via cdouglas)
|
||||||
|
|
||||||
HADOOP-4675 Current Ganglia metrics implementation is incompatible with Ganglia 3.1
|
|
||||||
(Brian Brockelman, Scott Beardsley via stack)
|
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-5595. NameNode does not need to run a replicator to choose a
|
HADOOP-5595. NameNode does not need to run a replicator to choose a
|
||||||
|
@ -7,9 +7,7 @@ dfs.class=org.apache.hadoop.metrics.spi.NullContext
|
|||||||
#dfs.fileName=/tmp/dfsmetrics.log
|
#dfs.fileName=/tmp/dfsmetrics.log
|
||||||
|
|
||||||
# Configuration of the "dfs" context for ganglia
|
# Configuration of the "dfs" context for ganglia
|
||||||
# Pick one: Ganglia 3.0 (former) or Ganglia 3.1 (latter)
|
|
||||||
# dfs.class=org.apache.hadoop.metrics.ganglia.GangliaContext
|
# dfs.class=org.apache.hadoop.metrics.ganglia.GangliaContext
|
||||||
# dfs.class=org.apache.hadoop.metrics.ganglia.GangliaContext31
|
|
||||||
# dfs.period=10
|
# dfs.period=10
|
||||||
# dfs.servers=localhost:8649
|
# dfs.servers=localhost:8649
|
||||||
|
|
||||||
@ -23,15 +21,13 @@ mapred.class=org.apache.hadoop.metrics.spi.NullContext
|
|||||||
#mapred.fileName=/tmp/mrmetrics.log
|
#mapred.fileName=/tmp/mrmetrics.log
|
||||||
|
|
||||||
# Configuration of the "mapred" context for ganglia
|
# Configuration of the "mapred" context for ganglia
|
||||||
# Pick one: Ganglia 3.0 (former) or Ganglia 3.1 (latter)
|
|
||||||
# mapred.class=org.apache.hadoop.metrics.ganglia.GangliaContext
|
# mapred.class=org.apache.hadoop.metrics.ganglia.GangliaContext
|
||||||
# mapred.class=org.apache.hadoop.metrics.ganglia.GangliaContext31
|
|
||||||
# mapred.period=10
|
# mapred.period=10
|
||||||
# mapred.servers=localhost:8649
|
# mapred.servers=localhost:8649
|
||||||
|
|
||||||
|
|
||||||
# Configuration of the "jvm" context for null
|
# Configuration of the "jvm" context for null
|
||||||
#jvm.class=org.apache.hadoop.metrics.spi.NullContext
|
jvm.class=org.apache.hadoop.metrics.spi.NullContext
|
||||||
|
|
||||||
# Configuration of the "jvm" context for file
|
# Configuration of the "jvm" context for file
|
||||||
#jvm.class=org.apache.hadoop.metrics.file.FileContext
|
#jvm.class=org.apache.hadoop.metrics.file.FileContext
|
||||||
|
@ -71,16 +71,16 @@ public class GangliaContext extends AbstractMetricsContext {
|
|||||||
typeTable.put(Float.class, "float");
|
typeTable.put(Float.class, "float");
|
||||||
}
|
}
|
||||||
|
|
||||||
protected byte[] buffer = new byte[BUFFER_SIZE];
|
private byte[] buffer = new byte[BUFFER_SIZE];
|
||||||
protected int offset;
|
private int offset;
|
||||||
|
|
||||||
protected List<? extends SocketAddress> metricsServers;
|
private List<? extends SocketAddress> metricsServers;
|
||||||
private Map<String,String> unitsTable;
|
private Map<String,String> unitsTable;
|
||||||
private Map<String,String> slopeTable;
|
private Map<String,String> slopeTable;
|
||||||
private Map<String,String> tmaxTable;
|
private Map<String,String> tmaxTable;
|
||||||
private Map<String,String> dmaxTable;
|
private Map<String,String> dmaxTable;
|
||||||
|
|
||||||
protected DatagramSocket datagramSocket;
|
private DatagramSocket datagramSocket;
|
||||||
|
|
||||||
/** Creates a new instance of GangliaContext */
|
/** Creates a new instance of GangliaContext */
|
||||||
public GangliaContext() {
|
public GangliaContext() {
|
||||||
@ -132,7 +132,7 @@ public void emitRecord(String contextName, String recordName,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void emitMetric(String name, String type, String value)
|
private void emitMetric(String name, String type, String value)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
String units = getUnits(name);
|
String units = getUnits(name);
|
||||||
int slope = getSlope(name);
|
int slope = getSlope(name);
|
||||||
@ -156,7 +156,7 @@ protected void emitMetric(String name, String type, String value)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getUnits(String metricName) {
|
private String getUnits(String metricName) {
|
||||||
String result = unitsTable.get(metricName);
|
String result = unitsTable.get(metricName);
|
||||||
if (result == null) {
|
if (result == null) {
|
||||||
result = DEFAULT_UNITS;
|
result = DEFAULT_UNITS;
|
||||||
@ -164,7 +164,7 @@ protected String getUnits(String metricName) {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected int getSlope(String metricName) {
|
private int getSlope(String metricName) {
|
||||||
String slopeString = slopeTable.get(metricName);
|
String slopeString = slopeTable.get(metricName);
|
||||||
if (slopeString == null) {
|
if (slopeString == null) {
|
||||||
slopeString = DEFAULT_SLOPE;
|
slopeString = DEFAULT_SLOPE;
|
||||||
@ -172,7 +172,7 @@ protected int getSlope(String metricName) {
|
|||||||
return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c
|
return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c
|
||||||
}
|
}
|
||||||
|
|
||||||
protected int getTmax(String metricName) {
|
private int getTmax(String metricName) {
|
||||||
if (tmaxTable == null) {
|
if (tmaxTable == null) {
|
||||||
return DEFAULT_TMAX;
|
return DEFAULT_TMAX;
|
||||||
}
|
}
|
||||||
@ -185,7 +185,7 @@ protected int getTmax(String metricName) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected int getDmax(String metricName) {
|
private int getDmax(String metricName) {
|
||||||
String dmaxString = dmaxTable.get(metricName);
|
String dmaxString = dmaxTable.get(metricName);
|
||||||
if (dmaxString == null) {
|
if (dmaxString == null) {
|
||||||
return DEFAULT_DMAX;
|
return DEFAULT_DMAX;
|
||||||
@ -200,7 +200,7 @@ protected int getDmax(String metricName) {
|
|||||||
* as an int, followed by the bytes of the string, padded if necessary to
|
* as an int, followed by the bytes of the string, padded if necessary to
|
||||||
* a multiple of 4.
|
* a multiple of 4.
|
||||||
*/
|
*/
|
||||||
protected void xdr_string(String s) {
|
private void xdr_string(String s) {
|
||||||
byte[] bytes = s.getBytes();
|
byte[] bytes = s.getBytes();
|
||||||
int len = bytes.length;
|
int len = bytes.length;
|
||||||
xdr_int(len);
|
xdr_int(len);
|
||||||
@ -222,7 +222,7 @@ private void pad() {
|
|||||||
/**
|
/**
|
||||||
* Puts an integer into the buffer as 4 bytes, big-endian.
|
* Puts an integer into the buffer as 4 bytes, big-endian.
|
||||||
*/
|
*/
|
||||||
protected void xdr_int(int i) {
|
private void xdr_int(int i) {
|
||||||
buffer[offset++] = (byte)((i >> 24) & 0xff);
|
buffer[offset++] = (byte)((i >> 24) & 0xff);
|
||||||
buffer[offset++] = (byte)((i >> 16) & 0xff);
|
buffer[offset++] = (byte)((i >> 16) & 0xff);
|
||||||
buffer[offset++] = (byte)((i >> 8) & 0xff);
|
buffer[offset++] = (byte)((i >> 8) & 0xff);
|
||||||
|
@ -1,144 +0,0 @@
|
|||||||
/*
|
|
||||||
* GangliaContext.java
|
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.metrics.ganglia;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.DatagramPacket;
|
|
||||||
import java.net.SocketAddress;
|
|
||||||
import java.net.UnknownHostException;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.metrics.ContextFactory;
|
|
||||||
import org.apache.hadoop.net.DNS;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Context for sending metrics to Ganglia version 3.1.x.
|
|
||||||
*
|
|
||||||
* 3.1.1 has a slightly different wire portal compared to 3.0.x.
|
|
||||||
*/
|
|
||||||
public class GangliaContext31 extends GangliaContext {
|
|
||||||
|
|
||||||
String hostName = "UNKNOWN.example.com";
|
|
||||||
|
|
||||||
private static final Log LOG =
|
|
||||||
LogFactory.getLog("org.apache.hadoop.util.GangliaContext31");
|
|
||||||
|
|
||||||
public void init(String contextName, ContextFactory factory) {
|
|
||||||
super.init(contextName, factory);
|
|
||||||
|
|
||||||
LOG.debug("Initializing the GangliaContext31 for Ganglia 3.1 metrics.");
|
|
||||||
|
|
||||||
// Take the hostname from the DNS class.
|
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
|
|
||||||
if (conf.get("slave.host.name") != null) {
|
|
||||||
hostName = conf.get("slave.host.name");
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
hostName = DNS.getDefaultHost(
|
|
||||||
conf.get("dfs.datanode.dns.interface","default"),
|
|
||||||
conf.get("dfs.datanode.dns.nameserver","default"));
|
|
||||||
} catch (UnknownHostException uhe) {
|
|
||||||
LOG.error(uhe);
|
|
||||||
hostName = "UNKNOWN.example.com";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void emitMetric(String name, String type, String value)
|
|
||||||
throws IOException
|
|
||||||
{
|
|
||||||
if (name == null) {
|
|
||||||
LOG.warn("Metric was emitted with no name.");
|
|
||||||
return;
|
|
||||||
} else if (value == null) {
|
|
||||||
LOG.warn("Metric name " + name +" was emitted with a null value.");
|
|
||||||
return;
|
|
||||||
} else if (type == null) {
|
|
||||||
LOG.warn("Metric name " + name + ", value " + value + " has no type.");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG.debug("Emitting metric " + name + ", type " + type + ", value " +
|
|
||||||
value + " from hostname" + hostName);
|
|
||||||
|
|
||||||
String units = getUnits(name);
|
|
||||||
if (units == null) {
|
|
||||||
LOG.warn("Metric name " + name + ", value " + value
|
|
||||||
+ " had 'null' units");
|
|
||||||
units = "";
|
|
||||||
}
|
|
||||||
int slope = getSlope(name);
|
|
||||||
int tmax = getTmax(name);
|
|
||||||
int dmax = getDmax(name);
|
|
||||||
offset = 0;
|
|
||||||
String groupName = name.substring(0,name.lastIndexOf("."));
|
|
||||||
|
|
||||||
// The following XDR recipe was done through a careful reading of
|
|
||||||
// gm_protocol.x in Ganglia 3.1 and carefully examining the output of
|
|
||||||
// the gmetric utility with strace.
|
|
||||||
|
|
||||||
// First we send out a metadata message
|
|
||||||
xdr_int(128); // metric_id = metadata_msg
|
|
||||||
xdr_string(hostName); // hostname
|
|
||||||
xdr_string(name); // metric name
|
|
||||||
xdr_int(0); // spoof = False
|
|
||||||
xdr_string(type); // metric type
|
|
||||||
xdr_string(name); // metric name
|
|
||||||
xdr_string(units); // units
|
|
||||||
xdr_int(slope); // slope
|
|
||||||
xdr_int(tmax); // tmax, the maximum time between metrics
|
|
||||||
xdr_int(dmax); // dmax, the maximum data value
|
|
||||||
|
|
||||||
xdr_int(1); /*Num of the entries in extra_value field for
|
|
||||||
Ganglia 3.1.x*/
|
|
||||||
xdr_string("GROUP"); /*Group attribute*/
|
|
||||||
xdr_string(groupName); /*Group value*/
|
|
||||||
|
|
||||||
for (SocketAddress socketAddress : metricsServers) {
|
|
||||||
DatagramPacket packet =
|
|
||||||
new DatagramPacket(buffer, offset, socketAddress);
|
|
||||||
datagramSocket.send(packet);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now we send out a message with the actual value.
|
|
||||||
// Technically, we only need to send out the metadata message once for
|
|
||||||
// each metric, but I don't want to have to record which metrics we did and
|
|
||||||
// did not send.
|
|
||||||
offset = 0;
|
|
||||||
xdr_int(133); // we are sending a string value
|
|
||||||
xdr_string(hostName); // hostName
|
|
||||||
xdr_string(name); // metric name
|
|
||||||
xdr_int(0); // spoof = False
|
|
||||||
xdr_string("%s"); // format field
|
|
||||||
xdr_string(value); // metric value
|
|
||||||
|
|
||||||
for (SocketAddress socketAddress : metricsServers) {
|
|
||||||
DatagramPacket packet =
|
|
||||||
new DatagramPacket(buffer, offset, socketAddress);
|
|
||||||
datagramSocket.send(packet);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user