From 4d2914210053f28c95094aa59e48f8e84c55a2c7 Mon Sep 17 00:00:00 2001 From: Ravi Prakash Date: Sat, 10 Jan 2015 08:35:40 -0800 Subject: [PATCH] HADOOP-11400. GraphiteSink does not reconnect to Graphite after 'broken pipe' (Kamil Gorlo via raviprak) --- .../hadoop-common/CHANGES.txt | 3 + .../hadoop/metrics2/sink/GraphiteSink.java | 143 +++++++++++++----- .../metrics2/impl/TestGraphiteMetrics.java | 111 +++++++++----- 3 files changed, 176 insertions(+), 81 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 90bfe3e0c9..679989aa87 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -488,6 +488,9 @@ Release 2.7.0 - UNRELEASED BUG FIXES + HADOOP 11400. GraphiteSink does not reconnect to Graphite after 'broken pipe' + (Kamil Gorlo via raviprak) + HADOOP-11236. NFS: Fix javadoc warning in RpcProgram.java (Abhiraj Butala via harsh) HADOOP-11166. Remove ulimit from test-patch.sh. (wang) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java index e72fe24844..e46a654e82 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java @@ -18,25 +18,24 @@ package org.apache.hadoop.metrics2.sink; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.io.Closeable; -import java.net.Socket; - import org.apache.commons.configuration.SubsetConfiguration; import org.apache.commons.io.Charsets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.metrics2.AbstractMetric; import org.apache.hadoop.metrics2.MetricsException; import org.apache.hadoop.metrics2.MetricsRecord; import org.apache.hadoop.metrics2.MetricsSink; import org.apache.hadoop.metrics2.MetricsTag; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.net.Socket; + /** * A metrics sink that writes to a Graphite server */ @@ -47,30 +46,22 @@ public class GraphiteSink implements MetricsSink, Closeable { private static final String SERVER_HOST_KEY = "server_host"; private static final String SERVER_PORT_KEY = "server_port"; private static final String METRICS_PREFIX = "metrics_prefix"; - private Writer writer = null; private String metricsPrefix = null; - private Socket socket = null; + private Graphite graphite = null; @Override public void init(SubsetConfiguration conf) { // Get Graphite host configurations. - String serverHost = conf.getString(SERVER_HOST_KEY); - Integer serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY)); + final String serverHost = conf.getString(SERVER_HOST_KEY); + final int serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY)); // Get Graphite metrics graph prefix. metricsPrefix = conf.getString(METRICS_PREFIX); if (metricsPrefix == null) metricsPrefix = ""; - try { - // Open an connection to Graphite server. - socket = new Socket(serverHost, serverPort); - writer = new OutputStreamWriter( - socket.getOutputStream(), Charsets.UTF_8); - } catch (Exception e) { - throw new MetricsException("Error creating connection, " - + serverHost + ":" + serverPort, e); - } + graphite = new Graphite(serverHost, serverPort); + graphite.connect(); } @Override @@ -104,39 +95,111 @@ public class GraphiteSink implements MetricsSink, Closeable { } try { - if(writer != null){ - writer.write(lines.toString()); - } else { - throw new MetricsException("Writer in GraphiteSink is null!"); - } + graphite.write(lines.toString()); } catch (Exception e) { - throw new MetricsException("Error sending metrics", e); + LOG.warn("Error sending metrics to Graphite", e); + try { + graphite.close(); + } catch (Exception e1) { + throw new MetricsException("Error closing connection to Graphite", e1); + } } } @Override public void flush() { + try { + graphite.flush(); + } catch (Exception e) { + LOG.warn("Error flushing metrics to Graphite", e); try { - writer.flush(); - } catch (Exception e) { - throw new MetricsException("Error flushing metrics", e); + graphite.close(); + } catch (Exception e1) { + throw new MetricsException("Error closing connection to Graphite", e1); } + } } @Override public void close() throws IOException { - try { - IOUtils.closeStream(writer); - writer = null; - LOG.info("writer in GraphiteSink is closed!"); - } catch (Throwable e){ - throw new MetricsException("Error closing writer", e); - } finally { - if (socket != null && !socket.isClosed()) { - socket.close(); - socket = null; - LOG.info("socket in GraphiteSink is closed!"); + graphite.close(); + } + + public static class Graphite { + private final static int MAX_CONNECTION_FAILURES = 5; + + private String serverHost; + private int serverPort; + private Writer writer = null; + private Socket socket = null; + private int connectionFailures = 0; + + public Graphite(String serverHost, int serverPort) { + this.serverHost = serverHost; + this.serverPort = serverPort; + } + + public void connect() { + if (isConnected()) { + throw new MetricsException("Already connected to Graphite"); + } + if (tooManyConnectionFailures()) { + // return silently (there was ERROR in logs when we reached limit for the first time) + return; + } + try { + // Open a connection to Graphite server. + socket = new Socket(serverHost, serverPort); + writer = new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8); + } catch (Exception e) { + connectionFailures++; + if (tooManyConnectionFailures()) { + // first time when connection limit reached, report to logs + LOG.error("Too many connection failures, would not try to connect again."); + } + throw new MetricsException("Error creating connection, " + + serverHost + ":" + serverPort, e); } } + + public void write(String msg) throws IOException { + if (!isConnected()) { + connect(); + } + if (isConnected()) { + writer.write(msg); + } + } + + public void flush() throws IOException { + if (isConnected()) { + writer.flush(); + } + } + + public boolean isConnected() { + return socket != null && socket.isConnected() && !socket.isClosed(); + } + + public void close() throws IOException { + try { + if (writer != null) { + writer.close(); + } + } catch (IOException ex) { + if (socket != null) { + socket.close(); + } + } finally { + socket = null; + writer = null; + } + } + + private boolean tooManyConnectionFailures() { + return connectionFailures > MAX_CONNECTION_FAILURES; + } + } + } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java index 09f0081276..5fac41e027 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java @@ -18,23 +18,7 @@ package org.apache.hadoop.metrics2.impl; -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - import org.apache.hadoop.metrics2.AbstractMetric; -import org.apache.hadoop.metrics2.MetricsException; import org.apache.hadoop.metrics2.MetricsRecord; import org.apache.hadoop.metrics2.MetricsTag; import org.apache.hadoop.metrics2.sink.GraphiteSink; @@ -42,6 +26,23 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.internal.util.reflection.Whitebox; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.reset; + + public class TestGraphiteMetrics { private AbstractMetric makeMetric(String name, Number value) { AbstractMetric metric = mock(AbstractMetric.class); @@ -50,6 +51,12 @@ public class TestGraphiteMetrics { return metric; } + private GraphiteSink.Graphite makeGraphite() { + GraphiteSink.Graphite mockGraphite = mock(GraphiteSink.Graphite.class); + when(mockGraphite.isConnected()).thenReturn(true); + return mockGraphite; + } + @Test public void testPutMetrics() { GraphiteSink sink = new GraphiteSink(); @@ -61,18 +68,18 @@ public class TestGraphiteMetrics { metrics.add(makeMetric("foo2", 2.25)); MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics); - OutputStreamWriter mockWriter = mock(OutputStreamWriter.class); ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); - Whitebox.setInternalState(sink, "writer", mockWriter); + final GraphiteSink.Graphite mockGraphite = makeGraphite(); + Whitebox.setInternalState(sink, "graphite", mockGraphite); sink.putMetrics(record); try { - verify(mockWriter).write(argument.capture()); + verify(mockGraphite).write(argument.capture()); } catch (IOException e) { - e.printStackTrace(); + e.printStackTrace(); } - String result = argument.getValue().toString(); + String result = argument.getValue(); assertEquals(true, result.equals("null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n" + @@ -86,24 +93,25 @@ public class TestGraphiteMetrics { GraphiteSink sink = new GraphiteSink(); List tags = new ArrayList(); tags.add(new MetricsTag(MsInfo.Context, "all")); - tags.add(new MetricsTag(MsInfo.Hostname, null)); + tags.add(new MetricsTag(MsInfo.Hostname, null)); Set metrics = new HashSet(); metrics.add(makeMetric("foo1", 1)); metrics.add(makeMetric("foo2", 2)); MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics); - OutputStreamWriter mockWriter = mock(OutputStreamWriter.class); + ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); - Whitebox.setInternalState(sink, "writer", mockWriter); + final GraphiteSink.Graphite mockGraphite = makeGraphite(); + Whitebox.setInternalState(sink, "graphite", mockGraphite); sink.putMetrics(record); try { - verify(mockWriter).write(argument.capture()); + verify(mockGraphite).write(argument.capture()); } catch (IOException e) { e.printStackTrace(); } - String result = argument.getValue().toString(); + String result = argument.getValue(); assertEquals(true, result.equals("null.all.Context.Context=all.foo1 1 10\n" + @@ -120,8 +128,8 @@ public class TestGraphiteMetrics { // setup GraphiteSink GraphiteSink sink = new GraphiteSink(); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Whitebox.setInternalState(sink, "writer", new OutputStreamWriter(out)); + final GraphiteSink.Graphite mockGraphite = makeGraphite(); + Whitebox.setInternalState(sink, "graphite", mockGraphite); // given two metrics records with timestamps 1000 milliseconds apart. List tags = Collections.emptyList(); @@ -141,15 +149,16 @@ public class TestGraphiteMetrics { } // then the timestamps in the graphite stream should differ by one second. - String expectedOutput - = "null.default.Context.foo1 1 1000000000\n" - + "null.default.Context.foo1 1 1000000001\n"; - assertEquals(expectedOutput, out.toString()); + try { + verify(mockGraphite).write(eq("null.default.Context.foo1 1 1000000000\n")); + verify(mockGraphite).write(eq("null.default.Context.foo1 1 1000000001\n")); + } catch (IOException e) { + e.printStackTrace(); + } } - - @Test(expected=MetricsException.class) - public void testCloseAndWrite() throws IOException { + @Test + public void testFailureAndPutMetrics() throws IOException { GraphiteSink sink = new GraphiteSink(); List tags = new ArrayList(); tags.add(new MetricsTag(MsInfo.Context, "all")); @@ -159,18 +168,38 @@ public class TestGraphiteMetrics { metrics.add(makeMetric("foo2", 2.25)); MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics); - OutputStreamWriter writer = mock(OutputStreamWriter.class); + final GraphiteSink.Graphite mockGraphite = makeGraphite(); + Whitebox.setInternalState(sink, "graphite", mockGraphite); + + // throw exception when first try + doThrow(new IOException("IO exception")).when(mockGraphite).write(anyString()); - Whitebox.setInternalState(sink, "writer", writer); - sink.close(); sink.putMetrics(record); + verify(mockGraphite).write(anyString()); + verify(mockGraphite).close(); + + // reset mock and try again + reset(mockGraphite); + when(mockGraphite.isConnected()).thenReturn(false); + + ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); + sink.putMetrics(record); + + verify(mockGraphite).write(argument.capture()); + String result = argument.getValue(); + + assertEquals(true, + result.equals("null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n" + + "null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n") || + result.equals("null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n" + + "null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n")); } @Test public void testClose(){ GraphiteSink sink = new GraphiteSink(); - Writer mockWriter = mock(Writer.class); - Whitebox.setInternalState(sink, "writer", mockWriter); + final GraphiteSink.Graphite mockGraphite = makeGraphite(); + Whitebox.setInternalState(sink, "graphite", mockGraphite); try { sink.close(); } catch (IOException ioe) { @@ -178,7 +207,7 @@ public class TestGraphiteMetrics { } try { - verify(mockWriter).close(); + verify(mockGraphite).close(); } catch (IOException ioe) { ioe.printStackTrace(); }