diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java index ec87c753f5..f40bd172b6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java @@ -901,9 +901,24 @@ public String getSchedulingDecisionSummary() { public String getCallVolumeSummary() { try { ObjectMapper om = new ObjectMapper(); - return om.writeValueAsString(callCounts); + return om.writeValueAsString(getDecayedCallCounts()); } catch (Exception e) { return "Error: " + e.getMessage(); } } + + private Map getDecayedCallCounts() { + Map decayedCallCounts = new HashMap<>(callCounts.size()); + Iterator>> it = + callCounts.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry> entry = it.next(); + Object user = entry.getKey(); + Long decayedCount = entry.getValue().get(0).get(); + if (decayedCount > 0) { + decayedCallCounts.put(user, decayedCount); + } + } + return decayedCallCounts; + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java index 0b0408cbcd..58380c5410 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java @@ -30,6 +30,10 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.conf.Configuration; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; + public class TestDecayRpcScheduler { private Schedulable mockCall(String id) { Schedulable mockCall = mock(Schedulable.class); @@ -189,12 +193,14 @@ public void testDecay() throws Exception { @Test @SuppressWarnings("deprecation") - public void testPriority() { + public void testPriority() throws Exception { Configuration conf = new Configuration(); - conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush - conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, - "25, 50, 75"); - scheduler = new DecayRpcScheduler(4, "ns", conf); + final String namespace = "ns"; + conf.set(namespace + "." + DecayRpcScheduler + .IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush + conf.set(namespace + "." + DecayRpcScheduler + .IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, "25, 50, 75"); + scheduler = new DecayRpcScheduler(4, namespace, conf); assertEquals(0, scheduler.getPriorityLevel(mockCall("A"))); assertEquals(2, scheduler.getPriorityLevel(mockCall("A"))); @@ -206,6 +212,20 @@ public void testPriority() { assertEquals(1, scheduler.getPriorityLevel(mockCall("A"))); assertEquals(1, scheduler.getPriorityLevel(mockCall("A"))); assertEquals(2, scheduler.getPriorityLevel(mockCall("A"))); + + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + ObjectName mxbeanName = new ObjectName( + "Hadoop:service="+ namespace + ",name=DecayRpcScheduler"); + + String cvs1 = (String) mbs.getAttribute(mxbeanName, "CallVolumeSummary"); + assertTrue("Get expected JMX of CallVolumeSummary before decay", + cvs1.equals("{\"A\":6,\"B\":2,\"C\":2}")); + + scheduler.forceDecay(); + + String cvs2 = (String) mbs.getAttribute(mxbeanName, "CallVolumeSummary"); + assertTrue("Get expected JMX for CallVolumeSummary after decay", + cvs2.equals("{\"A\":3,\"B\":1,\"C\":1}")); } @Test(timeout=2000)