HDFS-17250. EditLogTailer#triggerActiveLogRoll should handle thread Interrupted (#6266). Contributed by Haiyang Hu.
Reviewed-by: ZanderXu <zanderxu@apache.org> Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
parent
9a6d00aba4
commit
1fb80ef787
@ -612,8 +612,15 @@ public T call() throws IOException {
|
||||
private NamenodeProtocol getActiveNodeProxy() throws IOException {
|
||||
if (cachedActiveProxy == null) {
|
||||
while (true) {
|
||||
// If the thread is interrupted, quit by returning null.
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
LOG.warn("Interrupted while trying to getActiveNodeProxy.");
|
||||
return null;
|
||||
}
|
||||
|
||||
// if we have reached the max loop count, quit by returning null
|
||||
if ((nnLoopCount / nnCount) >= maxRetries) {
|
||||
LOG.warn("Have reached the max loop count ({}).", nnLoopCount);
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -638,4 +645,24 @@ private NamenodeProtocol getActiveNodeProxy() throws IOException {
|
||||
return cachedActiveProxy;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public NamenodeProtocol getCachedActiveProxy() {
|
||||
return cachedActiveProxy;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getLastRollTimeMs() {
|
||||
return lastRollTimeMs;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public RemoteNameNodeInfo getCurrentNN() {
|
||||
return currentNN;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setShouldRunForTest(boolean shouldRun) {
|
||||
this.tailerThread.setShouldRun(shouldRun);
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,7 @@
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.BindException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
@ -462,6 +463,80 @@ public void testStandbyTriggersLogRollsWhenTailInProgressEdits()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRollEditLogHandleThreadInterruption()
|
||||
throws IOException, InterruptedException, TimeoutException {
|
||||
Configuration conf = getConf();
|
||||
// RollEdits timeout 1s.
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY, 1);
|
||||
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
cluster = createMiniDFSCluster(conf, 3);
|
||||
cluster.transitionToActive(2);
|
||||
EditLogTailer tailer = Mockito.spy(
|
||||
cluster.getNamesystem(0).getEditLogTailer());
|
||||
|
||||
// Stop the edit log tail thread for testing.
|
||||
tailer.setShouldRunForTest(false);
|
||||
|
||||
final AtomicInteger invokedTimes = new AtomicInteger(0);
|
||||
|
||||
// For nn0 run triggerActiveLogRoll, nns is [nn1,nn2].
|
||||
// Mock the NameNodeProxy for testing.
|
||||
// An InterruptedIOException will be thrown when requesting to nn1.
|
||||
when(tailer.getNameNodeProxy()).thenReturn(
|
||||
tailer.new MultipleNameNodeProxy<Void>() {
|
||||
@Override
|
||||
protected Void doWork() throws IOException {
|
||||
invokedTimes.getAndIncrement();
|
||||
if (tailer.getCurrentNN().getNameNodeID().equals("nn1")) {
|
||||
while (true) {
|
||||
Thread.yield();
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
throw new InterruptedIOException("It is an Interrupted IOException.");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tailer.getCachedActiveProxy().rollEditLog();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
// Record the initial LastRollTimeMs value.
|
||||
// This time will be updated only when triggerActiveLogRoll is executed successfully.
|
||||
long initLastRollTimeMs = tailer.getLastRollTimeMs();
|
||||
|
||||
// Execute triggerActiveLogRoll for the first time.
|
||||
// The MultipleNameNodeProxy uses round-robin to look for an active NN to roll the edit log.
|
||||
// Here, a request will be made to nn1, and the main thread will trigger a Timeout and
|
||||
// the doWork() method will throw an InterruptedIOException.
|
||||
// The getActiveNodeProxy() method will determine that the thread is interrupted
|
||||
// and will return null.
|
||||
tailer.triggerActiveLogRoll();
|
||||
|
||||
// Execute triggerActiveLogRoll for the second time.
|
||||
// A request will be made to nn2 and the rollEditLog will be successfully finished and
|
||||
// lastRollTimeMs will be updated.
|
||||
tailer.triggerActiveLogRoll();
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return tailer.getLastRollTimeMs() > initLastRollTimeMs;
|
||||
}
|
||||
}, 100, 10000);
|
||||
|
||||
// The total number of invoked times should be 2.
|
||||
assertEquals(2, invokedTimes.get());
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void waitForStandbyToCatchUpWithInProgressEdits(
|
||||
final NameNode standby, final long activeTxId,
|
||||
int maxWaitSec) throws Exception {
|
||||
|
Loading…
Reference in New Issue
Block a user