HDFS-12323. NameNode terminates after full GC thinking QJM unresponsive if full GC is much longer than timeout. Contributed by Erik Krogen.
This commit is contained in:
parent
b9b607daa7
commit
90894c7262
@ -24,7 +24,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.util.StopWatch;
|
import org.apache.hadoop.util.StopWatch;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Timer;
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
@ -35,6 +35,7 @@
|
|||||||
import com.google.protobuf.Message;
|
import com.google.protobuf.Message;
|
||||||
import com.google.protobuf.TextFormat;
|
import com.google.protobuf.TextFormat;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents a set of calls for which a quorum of results is needed.
|
* Represents a set of calls for which a quorum of results is needed.
|
||||||
* @param <KEY> a key used to identify each of the outgoing calls
|
* @param <KEY> a key used to identify each of the outgoing calls
|
||||||
@ -60,11 +61,12 @@ class QuorumCall<KEY, RESULT> {
|
|||||||
* fraction of the configured timeout for any call.
|
* fraction of the configured timeout for any call.
|
||||||
*/
|
*/
|
||||||
private static final float WAIT_PROGRESS_WARN_THRESHOLD = 0.7f;
|
private static final float WAIT_PROGRESS_WARN_THRESHOLD = 0.7f;
|
||||||
private final StopWatch quorumStopWatch = new StopWatch();
|
private final StopWatch quorumStopWatch;
|
||||||
|
private final Timer timer;
|
||||||
|
|
||||||
static <KEY, RESULT> QuorumCall<KEY, RESULT> create(
|
static <KEY, RESULT> QuorumCall<KEY, RESULT> create(
|
||||||
Map<KEY, ? extends ListenableFuture<RESULT>> calls) {
|
Map<KEY, ? extends ListenableFuture<RESULT>> calls, Timer timer) {
|
||||||
final QuorumCall<KEY, RESULT> qr = new QuorumCall<KEY, RESULT>();
|
final QuorumCall<KEY, RESULT> qr = new QuorumCall<KEY, RESULT>(timer);
|
||||||
for (final Entry<KEY, ? extends ListenableFuture<RESULT>> e : calls.entrySet()) {
|
for (final Entry<KEY, ? extends ListenableFuture<RESULT>> e : calls.entrySet()) {
|
||||||
Preconditions.checkArgument(e.getValue() != null,
|
Preconditions.checkArgument(e.getValue() != null,
|
||||||
"null future for key: " + e.getKey());
|
"null future for key: " + e.getKey());
|
||||||
@ -83,17 +85,52 @@ public void onSuccess(RESULT res) {
|
|||||||
return qr;
|
return qr;
|
||||||
}
|
}
|
||||||
|
|
||||||
private QuorumCall() {
|
static <KEY, RESULT> QuorumCall<KEY, RESULT> create(
|
||||||
// Only instantiated from factory method above
|
Map<KEY, ? extends ListenableFuture<RESULT>> calls) {
|
||||||
|
return create(calls, new Timer());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Not intended for outside use.
|
||||||
|
*/
|
||||||
|
private QuorumCall() {
|
||||||
|
this(new Timer());
|
||||||
|
}
|
||||||
|
|
||||||
|
private QuorumCall(Timer timer) {
|
||||||
|
// Only instantiated from factory method above
|
||||||
|
this.timer = timer;
|
||||||
|
this.quorumStopWatch = new StopWatch(timer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used in conjunction with {@link #getQuorumTimeoutIncreaseMillis(long, int)}
|
||||||
|
* to check for pauses.
|
||||||
|
*/
|
||||||
private void restartQuorumStopWatch() {
|
private void restartQuorumStopWatch() {
|
||||||
quorumStopWatch.reset().start();
|
quorumStopWatch.reset().start();
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean shouldIncreaseQuorumTimeout(long offset, int millis) {
|
/**
|
||||||
|
* Check for a pause (e.g. GC) since the last time
|
||||||
|
* {@link #restartQuorumStopWatch()} was called. If detected, return the
|
||||||
|
* length of the pause; else, -1.
|
||||||
|
* @param offset Offset the elapsed time by this amount; use if some amount
|
||||||
|
* of pause was expected
|
||||||
|
* @param millis Total length of timeout in milliseconds
|
||||||
|
* @return Length of pause, if detected, else -1
|
||||||
|
*/
|
||||||
|
private long getQuorumTimeoutIncreaseMillis(long offset, int millis) {
|
||||||
long elapsed = quorumStopWatch.now(TimeUnit.MILLISECONDS);
|
long elapsed = quorumStopWatch.now(TimeUnit.MILLISECONDS);
|
||||||
return elapsed + offset > (millis * WAIT_PROGRESS_INFO_THRESHOLD);
|
long pauseTime = elapsed + offset;
|
||||||
|
if (pauseTime > (millis * WAIT_PROGRESS_INFO_THRESHOLD)) {
|
||||||
|
QuorumJournalManager.LOG.info("Pause detected while waiting for " +
|
||||||
|
"QuorumCall response; increasing timeout threshold by pause time " +
|
||||||
|
"of " + pauseTime + " ms.");
|
||||||
|
return pauseTime;
|
||||||
|
} else {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -119,7 +156,7 @@ public synchronized void waitFor(
|
|||||||
int minResponses, int minSuccesses, int maxExceptions,
|
int minResponses, int minSuccesses, int maxExceptions,
|
||||||
int millis, String operationName)
|
int millis, String operationName)
|
||||||
throws InterruptedException, TimeoutException {
|
throws InterruptedException, TimeoutException {
|
||||||
long st = Time.monotonicNow();
|
long st = timer.monotonicNow();
|
||||||
long nextLogTime = st + (long)(millis * WAIT_PROGRESS_INFO_THRESHOLD);
|
long nextLogTime = st + (long)(millis * WAIT_PROGRESS_INFO_THRESHOLD);
|
||||||
long et = st + millis;
|
long et = st + millis;
|
||||||
while (true) {
|
while (true) {
|
||||||
@ -128,7 +165,7 @@ public synchronized void waitFor(
|
|||||||
if (minResponses > 0 && countResponses() >= minResponses) return;
|
if (minResponses > 0 && countResponses() >= minResponses) return;
|
||||||
if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return;
|
if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return;
|
||||||
if (maxExceptions >= 0 && countExceptions() > maxExceptions) return;
|
if (maxExceptions >= 0 && countExceptions() > maxExceptions) return;
|
||||||
long now = Time.monotonicNow();
|
long now = timer.monotonicNow();
|
||||||
|
|
||||||
if (now > nextLogTime) {
|
if (now > nextLogTime) {
|
||||||
long waited = now - st;
|
long waited = now - st;
|
||||||
@ -154,8 +191,9 @@ public synchronized void waitFor(
|
|||||||
long rem = et - now;
|
long rem = et - now;
|
||||||
if (rem <= 0) {
|
if (rem <= 0) {
|
||||||
// Increase timeout if a full GC occurred after restarting stopWatch
|
// Increase timeout if a full GC occurred after restarting stopWatch
|
||||||
if (shouldIncreaseQuorumTimeout(0, millis)) {
|
long timeoutIncrease = getQuorumTimeoutIncreaseMillis(0, millis);
|
||||||
et = et + millis;
|
if (timeoutIncrease > 0) {
|
||||||
|
et += timeoutIncrease;
|
||||||
} else {
|
} else {
|
||||||
throw new TimeoutException();
|
throw new TimeoutException();
|
||||||
}
|
}
|
||||||
@ -165,8 +203,9 @@ public synchronized void waitFor(
|
|||||||
rem = Math.max(rem, 1);
|
rem = Math.max(rem, 1);
|
||||||
wait(rem);
|
wait(rem);
|
||||||
// Increase timeout if a full GC occurred after restarting stopWatch
|
// Increase timeout if a full GC occurred after restarting stopWatch
|
||||||
if (shouldIncreaseQuorumTimeout(-rem, millis)) {
|
long timeoutIncrease = getQuorumTimeoutIncreaseMillis(-rem, millis);
|
||||||
et = et + millis;
|
if (timeoutIncrease > 0) {
|
||||||
|
et += timeoutIncrease;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,7 @@
|
|||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.qjournal.client.QuorumCall;
|
import org.apache.hadoop.util.FakeTimer;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
@ -83,4 +83,33 @@ public void testQuorumFailsWithoutResponse() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testQuorumSucceedsWithLongPause() throws Exception {
|
||||||
|
final Map<String, SettableFuture<String>> futures = ImmutableMap.of(
|
||||||
|
"f1", SettableFuture.<String>create());
|
||||||
|
|
||||||
|
FakeTimer timer = new FakeTimer() {
|
||||||
|
private int callCount = 0;
|
||||||
|
@Override
|
||||||
|
public long monotonicNowNanos() {
|
||||||
|
callCount++;
|
||||||
|
if (callCount == 1) {
|
||||||
|
long old = super.monotonicNowNanos();
|
||||||
|
advance(1000000);
|
||||||
|
return old;
|
||||||
|
} else if (callCount == 10) {
|
||||||
|
futures.get("f1").set("first future");
|
||||||
|
return super.monotonicNowNanos();
|
||||||
|
} else {
|
||||||
|
return super.monotonicNowNanos();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
QuorumCall<String, String> q = QuorumCall.create(futures, timer);
|
||||||
|
assertEquals(0, q.countResponses());
|
||||||
|
|
||||||
|
q.waitFor(1, 0, 0, 3000, "test"); // wait for 1 response
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user