HADOOP-17280. Service-user cost shouldn't be accumulated to totalDecayedCallCost and totalRawCallCost. Contributed by Jinglun.

This commit is contained in:
He Xiaoqiao 2020-09-30 11:35:24 +08:00
parent 4c5ad57818
commit a490d87eb7
4 changed files with 116 additions and 12 deletions

View File

@ -152,10 +152,15 @@ public class DecayRpcScheduler implements RpcScheduler,
private final ConcurrentHashMap<Object, List<AtomicLong>> callCosts =
new ConcurrentHashMap<Object, List<AtomicLong>>();
// Should be the sum of all AtomicLongs in decayed callCosts
// Should be the sum of all AtomicLongs in decayed callCosts except
// service-user.
private final AtomicLong totalDecayedCallCost = new AtomicLong();
// The sum of all AtomicLongs in raw callCosts
// The sum of all AtomicLongs in raw callCosts except service-user.
private final AtomicLong totalRawCallCost = new AtomicLong();
// Should be the sum of all AtomicLongs in decayed callCosts of service-user.
private final AtomicLong totalServiceUserDecayedCallCost = new AtomicLong();
// The sum of all AtomicLongs in raw callCosts of service-user.
private final AtomicLong totalServiceUserRawCallCost = new AtomicLong();
// Track total call count and response time in current decay window
@ -446,6 +451,8 @@ private void decayCurrentCosts() {
try {
long totalDecayedCost = 0;
long totalRawCost = 0;
long totalServiceUserDecayedCost = 0;
long totalServiceUserRawCost = 0;
Iterator<Map.Entry<Object, List<AtomicLong>>> it =
callCosts.entrySet().iterator();
@ -456,10 +463,15 @@ private void decayCurrentCosts() {
// Compute the next value by reducing it by the decayFactor
totalRawCost += rawCost.get();
long currentValue = decayedCost.get();
long nextValue = (long) (currentValue * decayFactor);
totalDecayedCost += nextValue;
if (isServiceUser((String) entry.getKey())) {
totalServiceUserRawCost += rawCost.get();
totalServiceUserDecayedCost += nextValue;
} else {
totalRawCost += rawCost.get();
totalDecayedCost += nextValue;
}
decayedCost.set(nextValue);
LOG.debug(
@ -478,9 +490,13 @@ private void decayCurrentCosts() {
// Update the total so that we remain in sync
totalDecayedCallCost.set(totalDecayedCost);
totalRawCallCost.set(totalRawCost);
totalServiceUserDecayedCallCost.set(totalServiceUserDecayedCost);
totalServiceUserRawCallCost.set(totalServiceUserRawCost);
LOG.debug("After decaying the stored costs, totalDecayedCost: {}, " +
"totalRawCallCost: {}.", totalDecayedCost, totalRawCost);
LOG.debug("After decaying the stored costs, totalDecayedCost: {}, "
+ "totalRawCallCost: {}, totalServiceUserDecayedCost: {},"
+ " totalServiceUserRawCost: {}.", totalDecayedCost, totalRawCost,
totalServiceUserDecayedCost, totalServiceUserRawCost);
// Now refresh the cache of scheduling decisions
recomputeScheduleCache();
@ -538,8 +554,13 @@ private void addCost(Object identity, long costDelta) {
}
// Update the total
totalDecayedCallCost.getAndAdd(costDelta);
totalRawCallCost.getAndAdd(costDelta);
if (!isServiceUser((String) identity)) {
totalDecayedCallCost.getAndAdd(costDelta);
totalRawCallCost.getAndAdd(costDelta);
} else {
totalServiceUserDecayedCallCost.getAndAdd(costDelta);
totalServiceUserRawCallCost.getAndAdd(costDelta);
}
// At this point value is guaranteed to be not null. It may however have
// been clobbered from callCosts. Nonetheless, we return what
@ -893,6 +914,14 @@ public long getTotalRawCallVolume() {
return totalRawCallCost.get();
}
public long getTotalServiceUserCallVolume() {
return totalServiceUserDecayedCallCost.get();
}
public long getTotalServiceUserRawCallVolume() {
return totalServiceUserRawCallCost.get();
}
public long[] getResponseTimeCountInLastWindow() {
long[] ret = new long[responseTimeCountInLastWindow.length()];
for (int i = 0; i < responseTimeCountInLastWindow.length(); i++) {
@ -922,6 +951,8 @@ public void getMetrics(MetricsCollector collector, boolean all) {
addAvgResponseTimePerPriority(rb);
addCallVolumePerPriority(rb);
addRawCallVolume(rb);
addServiceUserDecayedCallVolume(rb);
addServiceUserRawCallVolume(rb);
} catch (Exception e) {
LOG.warn("Exception thrown while metric collection. Exception : "
+ e.getMessage());
@ -945,6 +976,20 @@ private void addRawCallVolume(MetricsRecordBuilder rb) {
"incoming Call Volume"), getTotalRawCallVolume());
}
// Key: ServiceUserDecayedCallVolume.
private void addServiceUserDecayedCallVolume(MetricsRecordBuilder rb) {
rb.addCounter(Interns.info("ServiceUserDecayedCallVolume",
"Service-user Decayed Total incoming Call Volume"),
getTotalServiceUserCallVolume());
}
// Key: ServiceUserCallVolume.
private void addServiceUserRawCallVolume(MetricsRecordBuilder rb) {
rb.addCounter(Interns.info("ServiceUserCallVolume",
"Service-user Raw Total incoming Call Volume"),
getTotalServiceUserRawCallVolume());
}
// Key: Priority.0.CompletedCallVolume
private void addCallVolumePerPriority(MetricsRecordBuilder rb) {
for (int i = 0; i < responseTimeCountInLastWindow.length(); i++) {

View File

@ -2588,7 +2588,8 @@
<name>ipc.[port_number].decay-scheduler.service-users</name>
<value></value>
<description>Service users will always be scheduled into the highest-priority
queue. They are specified as a comma-separated list.
queue and won't be included in the priority computation of normal user
calls. They are specified as a comma-separated list.
</description>
</property>

View File

@ -92,7 +92,8 @@ provider simply uses the username of the client submitting the request. However,
to performing throttling based on other groupings, or using an external identity provider.
If particular users submit important requests and you don't want to limit them, you can set them up as the
**service-users**. They are always scheduled into the high-priority queue.
**service-users**. They are always scheduled into the high-priority queue and won't be included in the priority
computation of normal user calls.
### Cost-based Fair Call Queue
@ -141,7 +142,7 @@ omitted.
| decay-scheduler.backoff.responsetime.enable | DecayRpcScheduler | Whether or not to enable the backoff by response time feature. | false |
| decay-scheduler.backoff.responsetime.thresholds | DecayRpcScheduler | The response time thresholds, as time durations, for each priority queue. If the average response time for a queue is above this threshold, backoff will occur in lower priority queues. This should be a comma-separated list of length equal to the number of priority levels. | Threshold increases by 10s per level (e.g., for 4 levels: `10s,20s,30s,40s`) |
| decay-scheduler.metrics.top.user.count | DecayRpcScheduler | The number of top (i.e., heaviest) users to emit metric information about. | 10 |
| decay-scheduler.service-users | DecayRpcScheduler | Service users will always be scheduled into the highest-priority queue. They are specified as a comma-separated list. | |
| decay-scheduler.service-users | DecayRpcScheduler | Service users will always be scheduled into the highest-priority queue and won't be included in the priority computation of normal user calls. They are specified as a comma-separated list. | |
| weighted-cost.lockshared | WeightedTimeCostProvider | The weight multiplier to apply to the time spent in the processing phase which holds a shared (read) lock. | 10 |
| weighted-cost.lockexclusive | WeightedTimeCostProvider | The weight multiplier to apply to the time spent in the processing phase which holds an exclusive (write) lock. | 100 |
| weighted-cost.{handler,lockfree,response} | WeightedTimeCostProvider | The weight multiplier to apply to the time spent in the processing phases which do not involve holding a lock. See `org.apache.hadoop.ipc.ProcessingDetails.Timing` for more details on each phase. | 1 |

View File

@ -24,6 +24,7 @@
import org.eclipse.jetty.util.ajax.JSON;
import org.junit.Test;
import static org.apache.hadoop.ipc.DecayRpcScheduler.IPC_DECAYSCHEDULER_THRESHOLDS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@ -389,8 +390,11 @@ private int getPriorityIncrementCallCount(String callId) {
return priority;
}
/**
* Test computing priorities and priority cache of users and service-users.
*/
@Test
public void testServiceUsers() {
public void testServiceUsersCase1() {
Configuration conf = new Configuration();
conf.setLong("ipc.19."
+ DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999);
@ -421,4 +425,57 @@ public void testServiceUsers() {
assertEquals(0L, summaryMap.get("service1"));
assertEquals(0L, summaryMap.get("service2"));
}
/**
* Test the service users' calls are not included when computing user's call
* priority.
*/
@Test
public void testServiceUsersCase2() {
final int level = 4;
Configuration conf = new Configuration();
conf.setLong("ipc.20."
+ DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999);
conf.set("ipc.20." + DecayRpcScheduler.IPC_DECAYSCHEDULER_SERVICE_USERS_KEY,
"service");
conf.set(IPC_DECAYSCHEDULER_THRESHOLDS_KEY, "0.125,0.25,0.5");
scheduler = new DecayRpcScheduler(level, "ipc.20", conf);
// test total costs.
for (int i = 0; i < 10; i++) {
getPriorityIncrementCallCount("user1");
}
for (int i = 0; i < 50; i++) {
getPriorityIncrementCallCount("service");
}
assertEquals(10, scheduler.getTotalCallVolume());
assertEquals(10, scheduler.getTotalRawCallVolume());
assertEquals(50, scheduler.getTotalServiceUserCallVolume());
assertEquals(50, scheduler.getTotalServiceUserRawCallVolume());
// test priority of normal user.
assertEquals(level - 1, scheduler.getPriorityLevel(mockCall("user1")));
// test total costs after decay.
scheduler.forceDecay();
assertEquals(5, scheduler.getTotalCallVolume());
assertEquals(10, scheduler.getTotalRawCallVolume());
assertEquals(25, scheduler.getTotalServiceUserCallVolume());
assertEquals(50, scheduler.getTotalServiceUserRawCallVolume());
// test priority of normal user.
assertEquals(level - 1, scheduler.getPriorityLevel(mockCall("user1")));
// test total costs again.
for (int i = 0; i < 10; i++) {
getPriorityIncrementCallCount("user1");
}
for (int i = 0; i < 50; i++) {
getPriorityIncrementCallCount("service");
}
assertEquals(15, scheduler.getTotalCallVolume());
assertEquals(20, scheduler.getTotalRawCallVolume());
assertEquals(75, scheduler.getTotalServiceUserCallVolume());
assertEquals(100, scheduler.getTotalServiceUserRawCallVolume());
// test priority of normal user.
assertEquals(level - 1, scheduler.getPriorityLevel(mockCall("user1")));
}
}