diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index f081d65aaf..f0fbdcd696 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -371,6 +371,8 @@ Release 2.5.0 - UNRELEASED HADOOP-10498. Add support for proxy server. (daryn) + HADOOP-9704. Write metrics sink plugin for Hadoop/Graphite (Chu Tong, Alex Newman and Babak Behzad via raviprak) + IMPROVEMENTS HADOOP-10451. Remove unused field and imports from SaslRpcServer. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java new file mode 100644 index 0000000000..7fa44486fa --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink; + +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.net.Socket; + +import org.apache.commons.configuration.SubsetConfiguration; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsException; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsSink; +import org.apache.hadoop.metrics2.MetricsTag; + +/** + * A metrics sink that writes to a Graphite server + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class GraphiteSink implements MetricsSink { + private static final String SERVER_HOST_KEY = "server_host"; + private static final String SERVER_PORT_KEY = "server_port"; + private static final String METRICS_PREFIX = "metrics_prefix"; + private Writer writer = null; + private String metricsPrefix = null; + + public void setWriter(Writer writer) { + this.writer = writer; + } + + @Override + public void init(SubsetConfiguration conf) { + // Get Graphite host configurations. + String serverHost = conf.getString(SERVER_HOST_KEY); + Integer serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY)); + + // Get Graphite metrics graph prefix. + metricsPrefix = conf.getString(METRICS_PREFIX); + if (metricsPrefix == null) + metricsPrefix = ""; + + try { + // Open an connection to Graphite server. + Socket socket = new Socket(serverHost, serverPort); + setWriter(new OutputStreamWriter(socket.getOutputStream())); + } catch (Exception e) { + throw new MetricsException("Error creating connection, " + + serverHost + ":" + serverPort, e); + } + } + + @Override + public void putMetrics(MetricsRecord record) { + StringBuilder lines = new StringBuilder(); + StringBuilder metricsPathPrefix = new StringBuilder(); + + // Configure the hierarchical place to display the graph. + metricsPathPrefix.append(metricsPrefix).append(".") + .append(record.context()).append(".").append(record.name()); + + for (MetricsTag tag : record.tags()) { + if (tag.value() != null) { + metricsPathPrefix.append("."); + metricsPathPrefix.append(tag.name()); + metricsPathPrefix.append("="); + metricsPathPrefix.append(tag.value()); + } + } + + // Round the timestamp to second as Graphite accepts it in such format. + int timestamp = Math.round(record.timestamp() / 1000.0f); + + // Collect datapoints. + for (AbstractMetric metric : record.metrics()) { + lines.append( + metricsPathPrefix.toString() + "." + + metric.name().replace(' ', '.')).append(" ") + .append(metric.value()).append(" ").append(timestamp) + .append("\n"); + } + + try { + writer.write(lines.toString()); + } catch (Exception e) { + throw new MetricsException("Error sending metrics", e); + } + } + + @Override + public void flush() { + try { + writer.flush(); + } catch (Exception e) { + throw new MetricsException("Error flushing metrics", e); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java new file mode 100644 index 0000000000..b41ea090b6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.impl; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.mockito.Mockito.*; + +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.metrics2.sink.GraphiteSink; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +public class TestGraphiteMetrics { + private AbstractMetric makeMetric(String name, Number value) { + AbstractMetric metric = mock(AbstractMetric.class); + when(metric.name()).thenReturn(name); + when(metric.value()).thenReturn(value); + return metric; + } + + @Test + public void testPutMetrics() { + GraphiteSink sink = new GraphiteSink(); + List tags = new ArrayList(); + tags.add(new MetricsTag(MsInfo.Context, "all")); + tags.add(new MetricsTag(MsInfo.Hostname, "host")); + Set metrics = new HashSet(); + metrics.add(makeMetric("foo1", 1.25)); + metrics.add(makeMetric("foo2", 2.25)); + MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics); + + OutputStreamWriter writer = mock(OutputStreamWriter.class); + ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); + + sink.setWriter(writer); + sink.putMetrics(record); + + try { + verify(writer).write(argument.capture()); + } catch (IOException e) { + e.printStackTrace(); + } + + String result = argument.getValue().toString(); + + assertEquals(true, + result.equals("null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n" + + "null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n") || + result.equals("null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n" + + "null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n")); + } + + @Test + public void testPutMetrics2() { + GraphiteSink sink = new GraphiteSink(); + List tags = new ArrayList(); + tags.add(new MetricsTag(MsInfo.Context, "all")); + tags.add(new MetricsTag(MsInfo.Hostname, null)); + Set metrics = new HashSet(); + metrics.add(makeMetric("foo1", 1)); + metrics.add(makeMetric("foo2", 2)); + MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics); + + OutputStreamWriter writer = mock(OutputStreamWriter.class); + ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); + + sink.setWriter(writer); + sink.putMetrics(record); + + try { + verify(writer).write(argument.capture()); + } catch (IOException e) { + e.printStackTrace(); + } + + String result = argument.getValue().toString(); + + assertEquals(true, + result.equals("null.all.Context.Context=all.foo1 1 10\n" + + "null.all.Context.Context=all.foo2 2 10\n") || + result.equals("null.all.Context.Context=all.foo2 2 10\n" + + "null.all.Context.Context=all.foo1 1 10\n")); + } +}