HADOOP-16192. Fix CallQueue backoff bugs: perform backoff when add() is used and update backoff when refreshed.

This commit is contained in:
Erik Krogen 2019-03-14 10:12:58 -07:00
parent 2db38abffc
commit 8c95cb9d6b
4 changed files with 27 additions and 3 deletions

View File

@ -222,12 +222,21 @@ public void put(E e) throws InterruptedException {
} else if (shouldBackOff(e)) {
throwBackoff();
} else {
add(e);
// No need to re-check backoff criteria since they were just checked
addInternal(e, false);
}
}
@Override
public boolean add(E e) {
return addInternal(e, true);
}
@VisibleForTesting
boolean addInternal(E e, boolean checkBackoff) {
if (checkBackoff && isClientBackoffEnabled() && shouldBackOff(e)) {
throwBackoff();
}
try {
return putRef.get().add(e);
} catch (CallQueueOverflowException ex) {

View File

@ -698,6 +698,7 @@ public synchronized void refreshCallQueue(Configuration conf) {
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
callQueue.swapQueue(getSchedulerClass(prefix, conf),
getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
callQueue.setClientBackoffEnabled(getClientBackoffEnable(prefix, conf));
}
/**

View File

@ -434,5 +434,18 @@ public void testCallQueueOverflowExceptions() throws Exception {
}
verify(queue, times(0)).put(call);
verify(queue, times(0)).add(call);
// backoff is enabled, add + scheduler backoff = overflow exception.
reset(queue);
cqm.setClientBackoffEnabled(true);
doReturn(Boolean.TRUE).when(cqm).shouldBackOff(call);
try {
cqm.add(call);
fail("didn't fail");
} catch (Exception ex) {
assertTrue(ex.toString(), ex instanceof CallQueueOverflowException);
}
verify(queue, times(0)).put(call);
verify(queue, times(0)).add(call);
}
}

View File

@ -95,6 +95,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
@ -1133,7 +1134,7 @@ public Void call() throws ServiceException, InterruptedException {
return null;
}
}));
verify(spy, timeout(500).times(i + 1)).add(any());
verify(spy, timeout(500).times(i + 1)).addInternal(any(), eq(false));
}
try {
proxy.sleep(null, newSleepRequest(100));
@ -1204,7 +1205,7 @@ public Void call() throws ServiceException, InterruptedException {
return null;
}
}));
verify(spy, timeout(500).times(i + 1)).add(any());
verify(spy, timeout(500).times(i + 1)).addInternal(any(), eq(false));
}
// Start another sleep RPC call and verify the call is backed off due to
// avg response time(3s) exceeds threshold (2s).