From 1adec79c7bcdb53128cffc6a3c289fa6e86a53b1 Mon Sep 17 00:00:00 2001 From: Ravi Prakash Date: Wed, 18 Jun 2014 09:27:03 +0000 Subject: [PATCH] HADOOP-10660. GraphiteSink should implement Closeable (Chen He and Ted Yu via raviprak) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1603379 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 2 + .../hadoop/metrics2/sink/GraphiteSink.java | 34 +++++++++++++++-- .../metrics2/impl/TestGraphiteMetrics.java | 37 +++++++++++++++++++ 3 files changed, 70 insertions(+), 3 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 9453b1bcfb..e58f8e9372 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -568,6 +568,8 @@ Release 2.5.0 - UNRELEASED HADOOP-10699. Fix build native library on mac osx (Binglin Chang via jlowe) + HADOOP-10660. GraphiteSink should implement Closeable (Chen He and Ted Yu via raviprak) + BREAKDOWN OF HADOOP-10514 SUBTASKS AND RELATED JIRAS HADOOP-10520. Extended attributes definition and FileSystem APIs for diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java index 7fa44486fa..b81ed4d80b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java @@ -18,13 +18,18 @@ package org.apache.hadoop.metrics2.sink; +import java.io.IOException; import java.io.OutputStreamWriter; import java.io.Writer; +import java.io.Closeable; import java.net.Socket; import org.apache.commons.configuration.SubsetConfiguration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.metrics2.AbstractMetric; import org.apache.hadoop.metrics2.MetricsException; import org.apache.hadoop.metrics2.MetricsRecord; @@ -36,12 +41,14 @@ import org.apache.hadoop.metrics2.MetricsTag; */ @InterfaceAudience.Public @InterfaceStability.Evolving -public class GraphiteSink implements MetricsSink { +public class GraphiteSink implements MetricsSink, Closeable { + private static final Log LOG = LogFactory.getLog(GraphiteSink.class); private static final String SERVER_HOST_KEY = "server_host"; private static final String SERVER_PORT_KEY = "server_port"; private static final String METRICS_PREFIX = "metrics_prefix"; private Writer writer = null; private String metricsPrefix = null; + private Socket socket = null; public void setWriter(Writer writer) { this.writer = writer; @@ -60,7 +67,7 @@ public class GraphiteSink implements MetricsSink { try { // Open an connection to Graphite server. - Socket socket = new Socket(serverHost, serverPort); + socket = new Socket(serverHost, serverPort); setWriter(new OutputStreamWriter(socket.getOutputStream())); } catch (Exception e) { throw new MetricsException("Error creating connection, " @@ -99,7 +106,11 @@ public class GraphiteSink implements MetricsSink { } try { - writer.write(lines.toString()); + if(writer != null){ + writer.write(lines.toString()); + } else { + throw new MetricsException("Writer in GraphiteSink is null!"); + } } catch (Exception e) { throw new MetricsException("Error sending metrics", e); } @@ -113,4 +124,21 @@ public class GraphiteSink implements MetricsSink { throw new MetricsException("Error flushing metrics", e); } } + + @Override + public void close() throws IOException { + try { + IOUtils.closeStream(writer); + writer = null; + LOG.info("writer in GraphiteSink is closed!"); + } catch (Throwable e){ + throw new MetricsException("Error closing writer", e); + } finally { + if (socket != null && !socket.isClosed()) { + socket.close(); + socket = null; + LOG.info("socket in GraphiteSink is closed!"); + } + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java index b41ea090b6..ab89bb8e82 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; import java.io.OutputStreamWriter; +import java.io.Writer; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -30,6 +31,7 @@ import java.util.Set; import static org.mockito.Mockito.*; import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsException; import org.apache.hadoop.metrics2.MetricsRecord; import org.apache.hadoop.metrics2.MetricsTag; import org.apache.hadoop.metrics2.sink.GraphiteSink; @@ -107,4 +109,39 @@ public class TestGraphiteMetrics { result.equals("null.all.Context.Context=all.foo2 2 10\n" + "null.all.Context.Context=all.foo1 1 10\n")); } + @Test(expected=MetricsException.class) + public void testCloseAndWrite() throws IOException { + GraphiteSink sink = new GraphiteSink(); + List tags = new ArrayList(); + tags.add(new MetricsTag(MsInfo.Context, "all")); + tags.add(new MetricsTag(MsInfo.Hostname, "host")); + Set metrics = new HashSet(); + metrics.add(makeMetric("foo1", 1.25)); + metrics.add(makeMetric("foo2", 2.25)); + MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics); + + OutputStreamWriter writer = mock(OutputStreamWriter.class); + + sink.setWriter(writer); + sink.close(); + sink.putMetrics(record); + } + + @Test + public void testClose(){ + GraphiteSink sink = new GraphiteSink(); + Writer mockWriter = mock(Writer.class); + sink.setWriter(mockWriter); + try { + sink.close(); + } catch (IOException ioe) { + ioe.printStackTrace(); + } + + try { + verify(mockWriter).close(); + } catch (IOException ioe) { + ioe.printStackTrace(); + } + } }