diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java index cbf8ebd320..50ed353255 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -261,6 +261,7 @@ public class CallQueueManager { Class> queueClassToUse, int maxSize, String ns, Configuration conf) { int priorityLevels = parseNumLevels(ns, conf); + this.scheduler.stop(); RpcScheduler newScheduler = createScheduler(schedulerClass, priorityLevels, ns, conf); BlockingQueue newQ = createCallQueueInstance(queueClassToUse, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java index 5547b1a0aa..3c09625830 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java @@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; import java.util.concurrent.atomic.AtomicReference; +import javax.management.ObjectName; + import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.AtomicDoubleArray; @@ -162,6 +164,7 @@ public class DecayRpcScheduler implements RpcScheduler, private final String namespace; private final int topUsersCount; // e.g., report top 10 users' metrics private static final double PRECISION = 0.0001; + private MetricsProxy metricsProxy; /** * This TimerTask will call decayCurrentCounts until @@ -230,9 +233,8 @@ public class DecayRpcScheduler implements RpcScheduler, DecayTask task = new DecayTask(this, timer); timer.scheduleAtFixedRate(task, decayPeriodMillis, decayPeriodMillis); - MetricsProxy prox = MetricsProxy.getInstance(ns, numLevels); - prox.setDelegate(this); - prox.registerMetrics2Source(ns); + metricsProxy = MetricsProxy.getInstance(ns, numLevels); + metricsProxy.setDelegate(this); } // Load configs @@ -671,11 +673,14 @@ public class DecayRpcScheduler implements RpcScheduler, private WeakReference delegate; private double[] averageResponseTimeDefault; private long[] callCountInLastWindowDefault; + private ObjectName decayRpcSchedulerInfoBeanName; private MetricsProxy(String namespace, int numLevels) { averageResponseTimeDefault = new double[numLevels]; callCountInLastWindowDefault = new long[numLevels]; - MBeans.register(namespace, "DecayRpcScheduler", this); + decayRpcSchedulerInfoBeanName = + MBeans.register(namespace, "DecayRpcScheduler", this); + this.registerMetrics2Source(namespace); } public static synchronized MetricsProxy getInstance(String namespace, @@ -689,6 +694,10 @@ public class DecayRpcScheduler implements RpcScheduler, return mp; } + public static synchronized void removeInstance(String namespace) { + MetricsProxy.INSTANCES.remove(namespace); + } + public void setDelegate(DecayRpcScheduler obj) { this.delegate = new WeakReference(obj); } @@ -698,6 +707,14 @@ public class DecayRpcScheduler implements RpcScheduler, DefaultMetricsSystem.instance().register(name, name, this); } + void unregisterSource(String namespace) { + final String name = "DecayRpcSchedulerMetrics2." + namespace; + DefaultMetricsSystem.instance().unregisterSource(name); + if (decayRpcSchedulerInfoBeanName != null) { + MBeans.unregister(decayRpcSchedulerInfoBeanName); + } + } + @Override public String getSchedulingDecisionSummary() { DecayRpcScheduler scheduler = delegate.get(); @@ -921,4 +938,10 @@ public class DecayRpcScheduler implements RpcScheduler, } return decayedCallCounts; } + + @Override + public void stop() { + metricsProxy.unregisterSource(namespace); + MetricsProxy.removeInstance(namespace); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java index 08f74d4b1c..0847af7f37 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java @@ -42,4 +42,8 @@ public class DefaultRpcScheduler implements RpcScheduler { public DefaultRpcScheduler(int priorityLevels, String namespace, Configuration conf) { } + + @Override + public void stop() { + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java index 6f93b22eed..95c5a13cdf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java @@ -32,4 +32,6 @@ public interface RpcScheduler { void addResponseTime(String name, int priorityLevel, int queueTime, int processingTime); + + void stop(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index e477b81062..ae6430f726 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -525,7 +525,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { /** Allow access to the client RPC server for testing */ @VisibleForTesting - RPC.Server getClientRpcServer() { + public RPC.Server getClientRpcServer() { return clientRpcServer; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java index 5cb7def880..d5eb9cfc48 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java @@ -33,49 +33,42 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer; import org.apache.hadoop.hdfs.tools.DFSAdmin; +import org.apache.hadoop.ipc.FairCallQueue; +import org.apache.hadoop.metrics2.MetricsException; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.junit.After; -import org.junit.Before; import org.junit.Test; public class TestRefreshCallQueue { private MiniDFSCluster cluster; private Configuration config; - private FileSystem fs; static int mockQueueConstructions; static int mockQueuePuts; - private String callQueueConfigKey = ""; - private final Random rand = new Random(); + private int nnPort = 0; - @Before - public void setUp() throws Exception { - // We want to count additional events, so we reset here - mockQueueConstructions = 0; - mockQueuePuts = 0; + private void setUp(Class queueClass) throws IOException { int portRetries = 5; - int nnPort; - + Random rand = new Random(); for (; portRetries > 0; --portRetries) { // Pick a random port in the range [30000,60000). - nnPort = 30000 + rand.nextInt(30000); + nnPort = 30000 + rand.nextInt(30000); config = new Configuration(); - callQueueConfigKey = "ipc." + nnPort + ".callqueue.impl"; - config.setClass(callQueueConfigKey, - MockCallQueue.class, BlockingQueue.class); + String callQueueConfigKey = "ipc." + nnPort + ".callqueue.impl"; + config.setClass(callQueueConfigKey, queueClass, BlockingQueue.class); config.set("hadoop.security.authorization", "true"); FileSystem.setDefaultUri(config, "hdfs://localhost:" + nnPort); - fs = FileSystem.get(config); - try { - cluster = new MiniDFSCluster.Builder(config).nameNodePort(nnPort).build(); + cluster = new MiniDFSCluster.Builder(config).nameNodePort(nnPort) + .build(); cluster.waitActive(); break; } catch (BindException be) { // Retry with a different port number. } } - if (portRetries == 0) { // Bail if we get very unlucky with our choice of ports. fail("Failed to pick an ephemeral port for the NameNode RPC server."); @@ -83,8 +76,8 @@ public class TestRefreshCallQueue { } @After - public void tearDown() throws Exception { - if(cluster!=null) { + public void tearDown() throws IOException { + if (cluster != null) { cluster.shutdown(); cluster = null; } @@ -105,29 +98,66 @@ public class TestRefreshCallQueue { // Returns true if mock queue was used for put public boolean canPutInMockQueue() throws IOException { - int putsBefore = mockQueuePuts; - fs.exists(new Path("/")); // Make an RPC call - return mockQueuePuts > putsBefore; + FileSystem fs = FileSystem.get(config); + int putsBefore = mockQueuePuts; + fs.exists(new Path("/")); // Make an RPC call + fs.close(); + return mockQueuePuts > putsBefore; } @Test public void testRefresh() throws Exception { - assertTrue("Mock queue should have been constructed", mockQueueConstructions > 0); + // We want to count additional events, so we reset here + mockQueueConstructions = 0; + mockQueuePuts = 0; + setUp(MockCallQueue.class); + + assertTrue("Mock queue should have been constructed", + mockQueueConstructions > 0); assertTrue("Puts are routed through MockQueue", canPutInMockQueue()); int lastMockQueueConstructions = mockQueueConstructions; - // Replace queue with the queue specified in core-site.xml, which would be the LinkedBlockingQueue + // Replace queue with the queue specified in core-site.xml, which would be + // the LinkedBlockingQueue DFSAdmin admin = new DFSAdmin(config); String [] args = new String[]{"-refreshCallQueue"}; int exitCode = admin.run(args); assertEquals("DFSAdmin should return 0", 0, exitCode); - assertEquals("Mock queue should have no additional constructions", lastMockQueueConstructions, mockQueueConstructions); + assertEquals("Mock queue should have no additional constructions", + lastMockQueueConstructions, mockQueueConstructions); try { - assertFalse("Puts are routed through LBQ instead of MockQueue", canPutInMockQueue()); - } catch (IOException ioe){ + assertFalse("Puts are routed through LBQ instead of MockQueue", + canPutInMockQueue()); + } catch (IOException ioe) { fail("Could not put into queue at all"); } } + @Test + public void testRefreshCallQueueWithFairCallQueue() throws Exception { + setUp(FairCallQueue.class); + boolean oldValue = DefaultMetricsSystem.inMiniClusterMode(); + + // throw an error when we double-initialize JvmMetrics + DefaultMetricsSystem.setMiniClusterMode(false); + + NameNodeRpcServer rpcServer = (NameNodeRpcServer) cluster.getNameNodeRpc(); + try { + rpcServer.getClientRpcServer().refreshCallQueue(config); + } catch (Exception e) { + Throwable cause = e.getCause(); + if ((cause instanceof MetricsException) + && cause.getMessage().contains( + "Metrics source DecayRpcSchedulerMetrics2.ipc." + nnPort + + " already exists!")) { + fail("DecayRpcScheduler metrics should be unregistered before" + + " reregister"); + } + throw e; + } finally { + DefaultMetricsSystem.setMiniClusterMode(oldValue); + } + } + } \ No newline at end of file