HADOOP-13285. DecayRpcScheduler MXBean should only report decayed CallVolumeSummary. Contributed by Xiaoyu Yao.

This commit is contained in:
Xiaoyu Yao 2016-06-17 15:25:14 -07:00
parent 280069510b
commit 0761379fe4
2 changed files with 41 additions and 6 deletions

View File

@ -901,9 +901,24 @@ public String getSchedulingDecisionSummary() {
public String getCallVolumeSummary() {
try {
ObjectMapper om = new ObjectMapper();
return om.writeValueAsString(callCounts);
return om.writeValueAsString(getDecayedCallCounts());
} catch (Exception e) {
return "Error: " + e.getMessage();
}
}
private Map<Object, Long> getDecayedCallCounts() {
Map<Object, Long> decayedCallCounts = new HashMap<>(callCounts.size());
Iterator<Map.Entry<Object, List<AtomicLong>>> it =
callCounts.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Object, List<AtomicLong>> entry = it.next();
Object user = entry.getKey();
Long decayedCount = entry.getValue().get(0).get();
if (decayedCount > 0) {
decayedCallCounts.put(user, decayedCount);
}
}
return decayedCallCounts;
}
}

View File

@ -30,6 +30,10 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.conf.Configuration;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
public class TestDecayRpcScheduler {
private Schedulable mockCall(String id) {
Schedulable mockCall = mock(Schedulable.class);
@ -189,12 +193,14 @@ public void testDecay() throws Exception {
@Test
@SuppressWarnings("deprecation")
public void testPriority() {
public void testPriority() throws Exception {
Configuration conf = new Configuration();
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY,
"25, 50, 75");
scheduler = new DecayRpcScheduler(4, "ns", conf);
final String namespace = "ns";
conf.set(namespace + "." + DecayRpcScheduler
.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
conf.set(namespace + "." + DecayRpcScheduler
.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, "25, 50, 75");
scheduler = new DecayRpcScheduler(4, namespace, conf);
assertEquals(0, scheduler.getPriorityLevel(mockCall("A")));
assertEquals(2, scheduler.getPriorityLevel(mockCall("A")));
@ -206,6 +212,20 @@ public void testPriority() {
assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
assertEquals(2, scheduler.getPriorityLevel(mockCall("A")));
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxbeanName = new ObjectName(
"Hadoop:service="+ namespace + ",name=DecayRpcScheduler");
String cvs1 = (String) mbs.getAttribute(mxbeanName, "CallVolumeSummary");
assertTrue("Get expected JMX of CallVolumeSummary before decay",
cvs1.equals("{\"A\":6,\"B\":2,\"C\":2}"));
scheduler.forceDecay();
String cvs2 = (String) mbs.getAttribute(mxbeanName, "CallVolumeSummary");
assertTrue("Get expected JMX for CallVolumeSummary after decay",
cvs2.equals("{\"A\":3,\"B\":1,\"C\":1}"));
}
@Test(timeout=2000)