HADOOP-18363. Fix bug preventing hadoop-metrics2 from emitting metrics to > 1 Ganglia servers (#4627)

* HADOOP-18363. Fix bug preventing hadoop-metrics2 from emitting metrics to > 1 Ganglia servers
This commit is contained in:
Ashutosh Gupta 2022-08-04 13:56:38 +01:00 committed by GitHub
parent dbf73e16b1
commit 0aa08ef543
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 67 additions and 49 deletions

View File

@ -21,6 +21,7 @@
import java.io.IOException; import java.io.IOException;
import java.net.*; import java.net.*;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -78,6 +79,10 @@ public abstract class AbstractGangliaSink implements MetricsSink {
private int offset; private int offset;
private boolean supportSparseMetrics = SUPPORT_SPARSE_METRICS_DEFAULT; private boolean supportSparseMetrics = SUPPORT_SPARSE_METRICS_DEFAULT;
public List<? extends SocketAddress> getMetricsServers() {
return metricsServers;
}
/** /**
* Used for visiting Metrics * Used for visiting Metrics
*/ */
@ -133,8 +138,11 @@ public void init(SubsetConfiguration conf) {
} }
// load the gannglia servers from properties // load the gannglia servers from properties
metricsServers = Servers.parse(conf.getString(SERVERS_PROPERTY), List<String> serversFromConf =
DEFAULT_PORT); conf.getList(String.class, SERVERS_PROPERTY, new ArrayList<String>());
metricsServers =
Servers.parse(serversFromConf.size() > 0 ? String.join(",", serversFromConf) : null,
DEFAULT_PORT);
multicastEnabled = conf.getBoolean(MULTICAST_ENABLED_PROPERTY, multicastEnabled = conf.getBoolean(MULTICAST_ENABLED_PROPERTY,
DEFAULT_MULTICAST_ENABLED); DEFAULT_MULTICAST_ENABLED);
multicastTtl = conf.getInt(MULTICAST_TTL_PROPERTY, DEFAULT_MULTICAST_TTL); multicastTtl = conf.getInt(MULTICAST_TTL_PROPERTY, DEFAULT_MULTICAST_TTL);

View File

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* * <p>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -20,6 +20,7 @@
import org.apache.commons.configuration2.SubsetConfiguration; import org.apache.commons.configuration2.SubsetConfiguration;
import org.apache.hadoop.metrics2.impl.ConfigBuilder; import org.apache.hadoop.metrics2.impl.ConfigBuilder;
import org.junit.Test; import org.junit.Test;
import java.net.DatagramSocket; import java.net.DatagramSocket;
@ -30,52 +31,61 @@
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class TestGangliaSink { public class TestGangliaSink {
@Test @Test
public void testShouldCreateDatagramSocketByDefault() throws Exception { public void testShouldCreateDatagramSocketByDefault() throws Exception {
SubsetConfiguration conf = new ConfigBuilder() SubsetConfiguration conf = new ConfigBuilder().subset("test.sink.ganglia");
.subset("test.sink.ganglia");
GangliaSink30 gangliaSink = new GangliaSink30(); GangliaSink30 gangliaSink = new GangliaSink30();
gangliaSink.init(conf); gangliaSink.init(conf);
DatagramSocket socket = gangliaSink.getDatagramSocket(); DatagramSocket socket = gangliaSink.getDatagramSocket();
assertFalse("Did not create DatagramSocket", socket == null || socket instanceof MulticastSocket); assertFalse("Did not create DatagramSocket",
} socket == null || socket instanceof MulticastSocket);
}
@Test @Test
public void testShouldCreateDatagramSocketIfMulticastIsDisabled() throws Exception { public void testShouldCreateDatagramSocketIfMulticastIsDisabled() throws Exception {
SubsetConfiguration conf = new ConfigBuilder() SubsetConfiguration conf =
.add("test.sink.ganglia.multicast", false) new ConfigBuilder().add("test.sink.ganglia.multicast", false).subset("test.sink.ganglia");
.subset("test.sink.ganglia"); GangliaSink30 gangliaSink = new GangliaSink30();
GangliaSink30 gangliaSink = new GangliaSink30(); gangliaSink.init(conf);
gangliaSink.init(conf); DatagramSocket socket = gangliaSink.getDatagramSocket();
DatagramSocket socket = gangliaSink.getDatagramSocket(); assertFalse("Did not create DatagramSocket",
assertFalse("Did not create DatagramSocket", socket == null || socket instanceof MulticastSocket); socket == null || socket instanceof MulticastSocket);
} }
@Test @Test
public void testShouldCreateMulticastSocket() throws Exception { public void testShouldCreateMulticastSocket() throws Exception {
SubsetConfiguration conf = new ConfigBuilder() SubsetConfiguration conf =
.add("test.sink.ganglia.multicast", true) new ConfigBuilder().add("test.sink.ganglia.multicast", true).subset("test.sink.ganglia");
.subset("test.sink.ganglia"); GangliaSink30 gangliaSink = new GangliaSink30();
GangliaSink30 gangliaSink = new GangliaSink30(); gangliaSink.init(conf);
gangliaSink.init(conf); DatagramSocket socket = gangliaSink.getDatagramSocket();
DatagramSocket socket = gangliaSink.getDatagramSocket(); assertTrue("Did not create MulticastSocket",
assertTrue("Did not create MulticastSocket", socket != null && socket instanceof MulticastSocket); socket != null && socket instanceof MulticastSocket);
int ttl = ((MulticastSocket) socket).getTimeToLive(); int ttl = ((MulticastSocket) socket).getTimeToLive();
assertEquals("Did not set default TTL", 1, ttl); assertEquals("Did not set default TTL", 1, ttl);
} }
@Test @Test
public void testShouldSetMulticastSocketTtl() throws Exception { public void testShouldSetMulticastSocketTtl() throws Exception {
SubsetConfiguration conf = new ConfigBuilder() SubsetConfiguration conf = new ConfigBuilder().add("test.sink.ganglia.multicast", true)
.add("test.sink.ganglia.multicast", true) .add("test.sink.ganglia.multicast.ttl", 3).subset("test.sink.ganglia");
.add("test.sink.ganglia.multicast.ttl", 3) GangliaSink30 gangliaSink = new GangliaSink30();
.subset("test.sink.ganglia"); gangliaSink.init(conf);
GangliaSink30 gangliaSink = new GangliaSink30(); DatagramSocket socket = gangliaSink.getDatagramSocket();
gangliaSink.init(conf); assertTrue("Did not create MulticastSocket",
DatagramSocket socket = gangliaSink.getDatagramSocket(); socket != null && socket instanceof MulticastSocket);
assertTrue("Did not create MulticastSocket", socket != null && socket instanceof MulticastSocket); int ttl = ((MulticastSocket) socket).getTimeToLive();
int ttl = ((MulticastSocket) socket).getTimeToLive(); assertEquals("Did not set TTL", 3, ttl);
assertEquals("Did not set TTL", 3, ttl); }
}
@Test
public void testMultipleMetricsServers() {
SubsetConfiguration conf =
new ConfigBuilder().add("test.sink.ganglia.servers", "server1,server2")
.subset("test.sink.ganglia");
GangliaSink30 gangliaSink = new GangliaSink30();
gangliaSink.init(conf);
assertEquals(2, gangliaSink.getMetricsServers().size());
}
} }