HDFS-15707. NNTop counts don't add up as expected. (#2516)
This commit is contained in:
parent
da1ea2530f
commit
32099e36dd
@ -23,7 +23,6 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
|
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
|
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.User;
|
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.User;
|
||||||
@ -144,8 +143,6 @@ public void report(long currTime, String userName, String cmd) {
|
|||||||
for (RollingWindowManager rollingWindowManager : rollingWindowManagers
|
for (RollingWindowManager rollingWindowManager : rollingWindowManagers
|
||||||
.values()) {
|
.values()) {
|
||||||
rollingWindowManager.recordMetric(currTime, cmd, userName, 1);
|
rollingWindowManager.recordMetric(currTime, cmd, userName, 1);
|
||||||
rollingWindowManager.recordMetric(currTime,
|
|
||||||
TopConf.ALL_CMDS, userName, 1);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,8 +113,8 @@ private int computeBucketIndex(long time) {
|
|||||||
* as well as atomic fields.
|
* as well as atomic fields.
|
||||||
*/
|
*/
|
||||||
private class Bucket {
|
private class Bucket {
|
||||||
AtomicLong value = new AtomicLong(0);
|
private AtomicLong value = new AtomicLong(0);
|
||||||
AtomicLong updateTime = new AtomicLong(0);
|
private AtomicLong updateTime = new AtomicLong(-1); // -1 = never updated.
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check whether the last time that the bucket was updated is no longer
|
* Check whether the last time that the bucket was updated is no longer
|
||||||
@ -125,7 +125,7 @@ private class Bucket {
|
|||||||
*/
|
*/
|
||||||
boolean isStaleNow(long time) {
|
boolean isStaleNow(long time) {
|
||||||
long utime = updateTime.get();
|
long utime = updateTime.get();
|
||||||
return time - utime >= windowLenMs;
|
return (utime == -1) || (time - utime >= windowLenMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -17,20 +17,22 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode.top.window;
|
package org.apache.hadoop.hdfs.server.namenode.top.window;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.Stack;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.metrics2.util.Metrics2Util.NameValuePair;
|
|
||||||
import org.apache.hadoop.metrics2.util.Metrics2Util.TopN;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -66,11 +68,15 @@ public static class TopWindow {
|
|||||||
|
|
||||||
public TopWindow(int windowMillis) {
|
public TopWindow(int windowMillis) {
|
||||||
this.windowMillis = windowMillis;
|
this.windowMillis = windowMillis;
|
||||||
this.top = Lists.newArrayList();
|
this.top = new LinkedList<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addOp(Op op) {
|
public void addOp(Op op) {
|
||||||
top.add(op);
|
if (op.getOpType().equals(TopConf.ALL_CMDS)) {
|
||||||
|
top.add(0, op);
|
||||||
|
} else {
|
||||||
|
top.add(op);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getWindowLenMs() {
|
public int getWindowLenMs() {
|
||||||
@ -86,41 +92,59 @@ public List<Op> getOps() {
|
|||||||
* Represents an operation within a TopWindow. It contains a ranked
|
* Represents an operation within a TopWindow. It contains a ranked
|
||||||
* set of the top users for the operation.
|
* set of the top users for the operation.
|
||||||
*/
|
*/
|
||||||
public static class Op {
|
public static class Op implements Comparable<Op> {
|
||||||
private final String opType;
|
private final String opType;
|
||||||
private final List<User> topUsers;
|
private final List<User> users;
|
||||||
private final long totalCount;
|
private final long totalCount;
|
||||||
|
private final int limit;
|
||||||
|
|
||||||
public Op(String opType, long totalCount) {
|
public Op(String opType, UserCounts users, int limit) {
|
||||||
this.opType = opType;
|
this.opType = opType;
|
||||||
this.topUsers = Lists.newArrayList();
|
this.users = new ArrayList<>(users);
|
||||||
this.totalCount = totalCount;
|
this.users.sort(Collections.reverseOrder());
|
||||||
}
|
this.totalCount = users.getTotal();
|
||||||
|
this.limit = limit;
|
||||||
public void addUser(User u) {
|
|
||||||
topUsers.add(u);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getOpType() {
|
public String getOpType() {
|
||||||
return opType;
|
return opType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<User> getAllUsers() {
|
||||||
|
return users;
|
||||||
|
}
|
||||||
|
|
||||||
public List<User> getTopUsers() {
|
public List<User> getTopUsers() {
|
||||||
return topUsers;
|
return (users.size() > limit) ? users.subList(0, limit) : users;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getTotalCount() {
|
public long getTotalCount() {
|
||||||
return totalCount;
|
return totalCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(Op other) {
|
||||||
|
return Long.signum(totalCount - other.totalCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
return (o instanceof Op) && totalCount == ((Op)o).totalCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return opType.hashCode();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents a user who called an Op within a TopWindow. Specifies the
|
* Represents a user who called an Op within a TopWindow. Specifies the
|
||||||
* user and the number of times the user called the operation.
|
* user and the number of times the user called the operation.
|
||||||
*/
|
*/
|
||||||
public static class User {
|
public static class User implements Comparable<User> {
|
||||||
private final String user;
|
private final String user;
|
||||||
private final long count;
|
private long count;
|
||||||
|
|
||||||
public User(String user, long count) {
|
public User(String user, long count) {
|
||||||
this.user = user;
|
this.user = user;
|
||||||
@ -134,6 +158,56 @@ public String getUser() {
|
|||||||
public long getCount() {
|
public long getCount() {
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void add(long delta) {
|
||||||
|
count += delta;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(User other) {
|
||||||
|
return Long.signum(count - other.count);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
return (o instanceof User) && user.equals(((User)o).user);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return user.hashCode();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class UserCounts extends ArrayList<User> {
|
||||||
|
private long total = 0;
|
||||||
|
|
||||||
|
UserCounts(int capacity) {
|
||||||
|
super(capacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean add(User user) {
|
||||||
|
long count = user.getCount();
|
||||||
|
int i = indexOf(user);
|
||||||
|
if (i == -1) {
|
||||||
|
super.add(new User(user.getUser(), count));
|
||||||
|
} else {
|
||||||
|
get(i).add(count);
|
||||||
|
}
|
||||||
|
total += count;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean addAll(Collection<? extends User> users) {
|
||||||
|
users.forEach(user -> add(user));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getTotal() {
|
||||||
|
return total;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -142,7 +216,7 @@ public long getCount() {
|
|||||||
* operated on that metric.
|
* operated on that metric.
|
||||||
*/
|
*/
|
||||||
public ConcurrentHashMap<String, RollingWindowMap> metricMap =
|
public ConcurrentHashMap<String, RollingWindowMap> metricMap =
|
||||||
new ConcurrentHashMap<String, RollingWindowMap>();
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public RollingWindowManager(Configuration conf, int reportingPeriodMs) {
|
public RollingWindowManager(Configuration conf, int reportingPeriodMs) {
|
||||||
|
|
||||||
@ -184,35 +258,33 @@ public void recordMetric(long time, String command,
|
|||||||
*
|
*
|
||||||
* @param time the current time
|
* @param time the current time
|
||||||
* @return a TopWindow describing the top users for each metric in the
|
* @return a TopWindow describing the top users for each metric in the
|
||||||
* window.
|
* window.
|
||||||
*/
|
*/
|
||||||
public TopWindow snapshot(long time) {
|
public TopWindow snapshot(long time) {
|
||||||
TopWindow window = new TopWindow(windowLenMs);
|
TopWindow window = new TopWindow(windowLenMs);
|
||||||
Set<String> metricNames = metricMap.keySet();
|
Set<String> metricNames = metricMap.keySet();
|
||||||
LOG.debug("iterating in reported metrics, size={} values={}",
|
LOG.debug("iterating in reported metrics, size={} values={}",
|
||||||
metricNames.size(), metricNames);
|
metricNames.size(), metricNames);
|
||||||
|
UserCounts totalCounts = new UserCounts(metricMap.size());
|
||||||
for (Map.Entry<String, RollingWindowMap> entry : metricMap.entrySet()) {
|
for (Map.Entry<String, RollingWindowMap> entry : metricMap.entrySet()) {
|
||||||
String metricName = entry.getKey();
|
String metricName = entry.getKey();
|
||||||
RollingWindowMap rollingWindows = entry.getValue();
|
RollingWindowMap rollingWindows = entry.getValue();
|
||||||
TopN topN = getTopUsersForMetric(time, metricName, rollingWindows);
|
UserCounts topN = getTopUsersForMetric(time, metricName, rollingWindows);
|
||||||
final int size = topN.size();
|
if (!topN.isEmpty()) {
|
||||||
if (size == 0) {
|
window.addOp(new Op(metricName, topN, topUsersCnt));
|
||||||
continue;
|
totalCounts.addAll(topN);
|
||||||
}
|
|
||||||
Op op = new Op(metricName, topN.getTotal());
|
|
||||||
window.addOp(op);
|
|
||||||
// Reverse the users from the TopUsers using a stack,
|
|
||||||
// since we'd like them sorted in descending rather than ascending order
|
|
||||||
Stack<NameValuePair> reverse = new Stack<NameValuePair>();
|
|
||||||
for (int i = 0; i < size; i++) {
|
|
||||||
reverse.push(topN.poll());
|
|
||||||
}
|
|
||||||
for (int i = 0; i < size; i++) {
|
|
||||||
NameValuePair userEntry = reverse.pop();
|
|
||||||
User user = new User(userEntry.getName(), userEntry.getValue());
|
|
||||||
op.addUser(user);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// synthesize the overall total op count with the top users for every op.
|
||||||
|
Set<User> topUsers = new HashSet<>();
|
||||||
|
for (Op op : window.getOps()) {
|
||||||
|
topUsers.addAll(op.getTopUsers());
|
||||||
|
}
|
||||||
|
// intersect totals with the top users.
|
||||||
|
totalCounts.retainAll(topUsers);
|
||||||
|
// allowed to exceed the per-op topUsersCnt to capture total ops for
|
||||||
|
// any user
|
||||||
|
window.addOp(new Op(TopConf.ALL_CMDS, totalCounts, Integer.MAX_VALUE));
|
||||||
return window;
|
return window;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -223,9 +295,9 @@ public TopWindow snapshot(long time) {
|
|||||||
* @param metricName Name of metric
|
* @param metricName Name of metric
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
private TopN getTopUsersForMetric(long time, String metricName,
|
private UserCounts getTopUsersForMetric(long time, String metricName,
|
||||||
RollingWindowMap rollingWindows) {
|
RollingWindowMap rollingWindows) {
|
||||||
TopN topN = new TopN(topUsersCnt);
|
UserCounts topN = new UserCounts(topUsersCnt);
|
||||||
Iterator<Map.Entry<String, RollingWindow>> iterator =
|
Iterator<Map.Entry<String, RollingWindow>> iterator =
|
||||||
rollingWindows.entrySet().iterator();
|
rollingWindows.entrySet().iterator();
|
||||||
while (iterator.hasNext()) {
|
while (iterator.hasNext()) {
|
||||||
@ -242,7 +314,7 @@ private TopN getTopUsersForMetric(long time, String metricName,
|
|||||||
}
|
}
|
||||||
LOG.debug("offer window of metric: {} userName: {} sum: {}",
|
LOG.debug("offer window of metric: {} userName: {} sum: {}",
|
||||||
metricName, userName, windowSum);
|
metricName, userName, windowSum);
|
||||||
topN.offer(new NameValuePair(userName, windowSum));
|
topN.add(new User(userName, windowSum));
|
||||||
}
|
}
|
||||||
LOG.debug("topN users size for command {} is: {}",
|
LOG.debug("topN users size for command {} is: {}",
|
||||||
metricName, topN.size());
|
metricName, topN.size());
|
||||||
|
@ -17,12 +17,16 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode.top.window;
|
package org.apache.hadoop.hdfs.server.namenode.top.window;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
|
||||||
import org.apache.log4j.Logger;
|
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@ -56,7 +60,7 @@ public void init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTops() {
|
public void testTops() throws Exception {
|
||||||
long time = WINDOW_LEN_MS + BUCKET_LEN * 3 / 2;
|
long time = WINDOW_LEN_MS + BUCKET_LEN * 3 / 2;
|
||||||
for (int i = 0; i < users.length; i++)
|
for (int i = 0; i < users.length; i++)
|
||||||
manager.recordMetric(time, "open", users[i], (i + 1) * 2);
|
manager.recordMetric(time, "open", users[i], (i + 1) * 2);
|
||||||
@ -66,11 +70,12 @@ public void testTops() {
|
|||||||
time++;
|
time++;
|
||||||
TopWindow tops = manager.snapshot(time);
|
TopWindow tops = manager.snapshot(time);
|
||||||
|
|
||||||
assertEquals("Unexpected number of ops", 2, tops.getOps().size());
|
assertEquals("Unexpected number of ops", 3, tops.getOps().size());
|
||||||
|
assertEquals(TopConf.ALL_CMDS, tops.getOps().get(0).getOpType());
|
||||||
for (Op op : tops.getOps()) {
|
for (Op op : tops.getOps()) {
|
||||||
final List<User> topUsers = op.getTopUsers();
|
final List<User> topUsers = op.getTopUsers();
|
||||||
assertEquals("Unexpected number of users", N_TOP_USERS, topUsers.size());
|
assertEquals("Unexpected number of users", N_TOP_USERS, topUsers.size());
|
||||||
if (op.getOpType() == "open") {
|
if (op.getOpType().equals("open")) {
|
||||||
for (int i = 0; i < topUsers.size(); i++) {
|
for (int i = 0; i < topUsers.size(); i++) {
|
||||||
User user = topUsers.get(i);
|
User user = topUsers.get(i);
|
||||||
assertEquals("Unexpected count for user " + user.getUser(),
|
assertEquals("Unexpected count for user " + user.getUser(),
|
||||||
@ -86,8 +91,9 @@ public void testTops() {
|
|||||||
// move the window forward not to see the "open" results
|
// move the window forward not to see the "open" results
|
||||||
time += WINDOW_LEN_MS - 2;
|
time += WINDOW_LEN_MS - 2;
|
||||||
tops = manager.snapshot(time);
|
tops = manager.snapshot(time);
|
||||||
assertEquals("Unexpected number of ops", 1, tops.getOps().size());
|
assertEquals("Unexpected number of ops", 2, tops.getOps().size());
|
||||||
final Op op = tops.getOps().get(0);
|
assertEquals(TopConf.ALL_CMDS, tops.getOps().get(0).getOpType());
|
||||||
|
final Op op = tops.getOps().get(1);
|
||||||
assertEquals("Should only see close ops", "close", op.getOpType());
|
assertEquals("Should only see close ops", "close", op.getOpType());
|
||||||
final List<User> topUsers = op.getTopUsers();
|
final List<User> topUsers = op.getTopUsers();
|
||||||
for (int i = 0; i < topUsers.size(); i++) {
|
for (int i = 0; i < topUsers.size(); i++) {
|
||||||
@ -99,4 +105,158 @@ public void testTops() {
|
|||||||
assertEquals("Unexpected total count for op",
|
assertEquals("Unexpected total count for op",
|
||||||
(1 + users.length) * (users.length / 2), op.getTotalCount());
|
(1 + users.length) * (users.length / 2), op.getTotalCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void windowReset() throws Exception {
|
||||||
|
Configuration config = new Configuration();
|
||||||
|
config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
|
||||||
|
config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
|
||||||
|
int period = 2;
|
||||||
|
RollingWindowManager rollingWindowManager =
|
||||||
|
new RollingWindowManager(config, period);
|
||||||
|
rollingWindowManager.recordMetric(0, "op1", users[0], 3);
|
||||||
|
checkValues(rollingWindowManager, 0, "op1", 3, 3);
|
||||||
|
checkValues(rollingWindowManager, period - 1, "op1", 3, 3);
|
||||||
|
checkValues(rollingWindowManager, period, "op1", 0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTotal() throws Exception {
|
||||||
|
Configuration config = new Configuration();
|
||||||
|
config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
|
||||||
|
config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
|
||||||
|
int period = 10;
|
||||||
|
RollingWindowManager rollingWindowManager =
|
||||||
|
new RollingWindowManager(config, period);
|
||||||
|
|
||||||
|
long t = 0;
|
||||||
|
rollingWindowManager.recordMetric(t, "op1", users[0], 3);
|
||||||
|
checkValues(rollingWindowManager, t, "op1", 3, 3);
|
||||||
|
|
||||||
|
// both should have a value.
|
||||||
|
t = (long)(period * .5);
|
||||||
|
rollingWindowManager.recordMetric(t, "op2", users[0], 4);
|
||||||
|
checkValues(rollingWindowManager, t, "op1", 3, 7);
|
||||||
|
checkValues(rollingWindowManager, t, "op2", 4, 7);
|
||||||
|
|
||||||
|
// neither should reset.
|
||||||
|
t = period - 1;
|
||||||
|
checkValues(rollingWindowManager, t, "op1", 3, 7);
|
||||||
|
checkValues(rollingWindowManager, t, "op2", 4, 7);
|
||||||
|
|
||||||
|
// op1 should reset in its next period, but not op2.
|
||||||
|
t = period;
|
||||||
|
rollingWindowManager.recordMetric(10, "op1", users[0], 10);
|
||||||
|
checkValues(rollingWindowManager, t, "op1", 10, 14);
|
||||||
|
checkValues(rollingWindowManager, t, "op2", 4, 14);
|
||||||
|
|
||||||
|
// neither should reset.
|
||||||
|
t = (long)(period * 1.25);
|
||||||
|
rollingWindowManager.recordMetric(t, "op2", users[0], 7);
|
||||||
|
checkValues(rollingWindowManager, t, "op1", 10, 21);
|
||||||
|
checkValues(rollingWindowManager, t, "op2", 11, 21);
|
||||||
|
|
||||||
|
// op2 should reset.
|
||||||
|
t = (long)(period * 1.5);
|
||||||
|
rollingWindowManager.recordMetric(t, "op2", users[0], 13);
|
||||||
|
checkValues(rollingWindowManager, t, "op1", 10, 23);
|
||||||
|
checkValues(rollingWindowManager, t, "op2", 13, 23);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithFuzzing() throws Exception {
|
||||||
|
Configuration config = new Configuration();
|
||||||
|
config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
|
||||||
|
config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
|
||||||
|
int period = users.length/2;
|
||||||
|
RollingWindowManager rollingWindowManager =
|
||||||
|
new RollingWindowManager(config, period);
|
||||||
|
|
||||||
|
String[] ops = {"op1", "op2", "op3", "op4"};
|
||||||
|
Random rand = new Random();
|
||||||
|
for (int i=0; i < 10000; i++) {
|
||||||
|
rollingWindowManager.recordMetric(i, ops[rand.nextInt(ops.length)],
|
||||||
|
users[rand.nextInt(users.length)],
|
||||||
|
rand.nextInt(100));
|
||||||
|
TopWindow window = rollingWindowManager.snapshot(i);
|
||||||
|
checkTotal(window);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOpTotal() throws Exception {
|
||||||
|
int numTopUsers = 2;
|
||||||
|
Configuration config = new Configuration();
|
||||||
|
config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
|
||||||
|
config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, numTopUsers);
|
||||||
|
int period = users.length/2;
|
||||||
|
RollingWindowManager rollingWindowManager =
|
||||||
|
new RollingWindowManager(config, period);
|
||||||
|
|
||||||
|
int numOps = 3;
|
||||||
|
rollingWindowManager.recordMetric(0, "op1", "user1", 10);
|
||||||
|
rollingWindowManager.recordMetric(0, "op1", "user2", 20);
|
||||||
|
rollingWindowManager.recordMetric(0, "op1", "user3", 30);
|
||||||
|
|
||||||
|
rollingWindowManager.recordMetric(0, "op2", "user1", 1);
|
||||||
|
rollingWindowManager.recordMetric(0, "op2", "user4", 40);
|
||||||
|
rollingWindowManager.recordMetric(0, "op2", "user5", 50);
|
||||||
|
|
||||||
|
rollingWindowManager.recordMetric(0, "op3", "user6", 1);
|
||||||
|
rollingWindowManager.recordMetric(0, "op3", "user7", 11);
|
||||||
|
rollingWindowManager.recordMetric(0, "op3", "user8", 1);
|
||||||
|
|
||||||
|
TopWindow window = rollingWindowManager.snapshot(0);
|
||||||
|
Assert.assertEquals(numOps + 1, window.getOps().size());
|
||||||
|
|
||||||
|
Op allOp = window.getOps().get(0);
|
||||||
|
Assert.assertEquals(TopConf.ALL_CMDS, allOp.getOpType());
|
||||||
|
List<User> topUsers = allOp.getTopUsers();
|
||||||
|
Assert.assertEquals(numTopUsers * numOps, topUsers.size());
|
||||||
|
// ensure all the top users for each op are present in the total op.
|
||||||
|
for (int i = 1; i < numOps; i++) {
|
||||||
|
Assert.assertTrue(
|
||||||
|
topUsers.containsAll(window.getOps().get(i).getTopUsers()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkValues(RollingWindowManager rwManager, long time,
|
||||||
|
String opType, long value, long expectedTotal) throws Exception {
|
||||||
|
TopWindow window = rwManager.snapshot(time);
|
||||||
|
for (Op windowOp : window.getOps()) {
|
||||||
|
if (opType.equals(windowOp.getOpType())) {
|
||||||
|
assertEquals(value, windowOp.getTotalCount());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(expectedTotal, checkTotal(window));
|
||||||
|
}
|
||||||
|
|
||||||
|
private long checkTotal(TopWindow window) throws Exception {
|
||||||
|
long allOpTotal = 0;
|
||||||
|
long computedOpTotal = 0;
|
||||||
|
|
||||||
|
Map<String, User> userOpTally = new HashMap<>();
|
||||||
|
for (String user : users) {
|
||||||
|
userOpTally.put(user, new User(user, 0));
|
||||||
|
}
|
||||||
|
for (Op windowOp : window.getOps()) {
|
||||||
|
int multiplier;
|
||||||
|
if (TopConf.ALL_CMDS.equals(windowOp.getOpType())) {
|
||||||
|
multiplier = -1;
|
||||||
|
allOpTotal += windowOp.getTotalCount();
|
||||||
|
} else {
|
||||||
|
multiplier = 1;
|
||||||
|
computedOpTotal += windowOp.getTotalCount();
|
||||||
|
}
|
||||||
|
for (User user : windowOp.getAllUsers()) {
|
||||||
|
userOpTally.get(user.getUser()).add((int)(multiplier*user.getCount()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(allOpTotal, computedOpTotal);
|
||||||
|
for (String user : userOpTally.keySet()) {
|
||||||
|
assertEquals(0, userOpTally.get(user).getCount());
|
||||||
|
}
|
||||||
|
return computedOpTotal;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user