HDFS-16689. Standby NameNode crashes when transitioning to Active with in-progress tailer (#4744)

Signed-off-by: Erik Krogen <xkrogen@apache.org>
Co-authored-by: zengqiang.xu <zengqiang.xu@shopee.com>
This commit is contained in:
ZanderXu 2022-12-22 02:06:01 +08:00 committed by GitHub
parent b63b777c84
commit 15b52fb6a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 264 additions and 50 deletions

View File

@ -1654,18 +1654,31 @@ synchronized void logEdit(final int length, final byte[] data) {
endTransaction(start); endTransaction(start);
} }
void recoverUnclosedStreams() throws IOException {
recoverUnclosedStreams(false);
}
/** /**
* Run recovery on all journals to recover any unclosed segments * Run recovery on all journals to recover any unclosed segments
*/ */
synchronized void recoverUnclosedStreams() { synchronized void recoverUnclosedStreams(boolean terminateOnFailure) throws IOException {
Preconditions.checkState( Preconditions.checkState(
state == State.BETWEEN_LOG_SEGMENTS, state == State.BETWEEN_LOG_SEGMENTS,
"May not recover segments - wrong state: %s", state); "May not recover segments - wrong state: %s", state);
try { try {
journalSet.recoverUnfinalizedSegments(); journalSet.recoverUnfinalizedSegments();
} catch (IOException ex) { } catch (IOException ex) {
// All journals have failed, it is handled in logSync. if (terminateOnFailure) {
// TODO: are we sure this is OK? final String msg = "Unable to recover log segments: "
+ "too few journals successfully recovered.";
LOG.error(msg, ex);
synchronized (journalSetLock) {
IOUtils.cleanupWithLogger(LOG, journalSet);
}
terminate(1, msg);
} else {
throw ex;
}
} }
} }

View File

@ -1389,7 +1389,7 @@ void startActiveServices() throws IOException {
// During startup, we're already open for write during initialization. // During startup, we're already open for write during initialization.
editLog.initJournalsForWrite(); editLog.initJournalsForWrite();
// May need to recover // May need to recover
editLog.recoverUnclosedStreams(); editLog.recoverUnclosedStreams(true);
LOG.info("Catching up to latest edits from old active before " + LOG.info("Catching up to latest edits from old active before " +
"taking over writer role in edits logs"); "taking over writer role in edits logs");

View File

@ -311,7 +311,8 @@ public Void run() throws Exception {
startTime - lastLoadTimeMs); startTime - lastLoadTimeMs);
// It is already under the name system lock and the checkpointer // It is already under the name system lock and the checkpointer
// thread is already stopped. No need to acquire any other lock. // thread is already stopped. No need to acquire any other lock.
editsTailed = doTailEdits(); // HDFS-16689. Disable inProgress to use the streaming mechanism
editsTailed = doTailEdits(false);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException(e); throw new IOException(e);
} finally { } finally {
@ -323,9 +324,13 @@ public Void run() throws Exception {
} }
}); });
} }
@VisibleForTesting @VisibleForTesting
public long doTailEdits() throws IOException, InterruptedException { public long doTailEdits() throws IOException, InterruptedException {
return doTailEdits(inProgressOk);
}
private long doTailEdits(boolean enableInProgress) throws IOException, InterruptedException {
Collection<EditLogInputStream> streams; Collection<EditLogInputStream> streams;
FSImage image = namesystem.getFSImage(); FSImage image = namesystem.getFSImage();
@ -334,7 +339,7 @@ public long doTailEdits() throws IOException, InterruptedException {
long startTime = timer.monotonicNow(); long startTime = timer.monotonicNow();
try { try {
streams = editLog.selectInputStreams(lastTxnId + 1, 0, streams = editLog.selectInputStreams(lastTxnId + 1, 0,
null, inProgressOk, true); null, enableInProgress, true);
} catch (IOException ioe) { } catch (IOException ioe) {
// This is acceptable. If we try to tail edits in the middle of an edits // This is acceptable. If we try to tail edits in the middle of an edits
// log roll, i.e. the last one has been finalized but the new inprogress // log roll, i.e. the last one has been finalized but the new inprogress

View File

@ -33,7 +33,6 @@
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ExitUtil.ExitException;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -197,10 +196,9 @@ public void testMismatchedNNIsRejected() throws Exception {
.manageNameDfsDirs(false).format(false).checkExitOnShutdown(false) .manageNameDfsDirs(false).format(false).checkExitOnShutdown(false)
.build(); .build();
fail("New NN with different namespace should have been rejected"); fail("New NN with different namespace should have been rejected");
} catch (ExitException ee) { } catch (IOException ioe) {
GenericTestUtils.assertExceptionContains( GenericTestUtils.assertExceptionContains(
"Unable to start log segment 1: too few journals", ee); "recoverUnfinalizedSegments failed for too many journals", ioe);
assertTrue("Didn't terminate properly ", ExitUtil.terminateCalled());
} }
} }
} }

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.qjournal.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
/**
* One Util class to mock QJM for some UTs not in this package.
*/
public final class SpyQJournalUtil {
private SpyQJournalUtil() {
}
/**
* Mock a QuorumJournalManager with input uri, nsInfo and namServiceId.
* @param conf input configuration.
* @param uri input uri.
* @param nsInfo input nameservice info.
* @param nameServiceId input nameservice Id.
* @return one mocked QuorumJournalManager.
* @throws IOException throw IOException.
*/
public static QuorumJournalManager createSpyingQJM(Configuration conf,
URI uri, NamespaceInfo nsInfo, String nameServiceId) throws IOException {
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, String nameServiceId, InetSocketAddress addr) {
AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId,
nameServiceId, addr) {
protected ExecutorService createSingleThreadExecutor() {
// Don't parallelize calls to the quorum in the tests.
// This makes the tests more deterministic.
return new DirectExecutorService();
}
};
return Mockito.spy(logger);
}
};
return new QuorumJournalManager(conf, uri, nsInfo, nameServiceId, spyFactory);
}
/**
* Mock Journals with different response for getJournaledEdits rpc with the input startTxid.
* 1. First journal with one empty response.
* 2. Second journal with one normal response.
* 3. Third journal with one slow response.
* @param manager input QuorumJournalManager.
* @param startTxid input start txid.
*/
public static void mockJNWithEmptyOrSlowResponse(QuorumJournalManager manager, long startTxid) {
List<AsyncLogger> spies = manager.getLoggerSetForTests().getLoggersForTests();
Semaphore semaphore = new Semaphore(0);
// Mock JN0 return an empty response.
Mockito.doAnswer(invocation -> {
semaphore.release();
return GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build();
}).when(spies.get(0))
.getJournaledEdits(startTxid, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
// Mock JN1 return a normal response.
spyGetJournaledEdits(spies, 1, startTxid, () -> semaphore.release(1));
// Mock JN2 return a slow response
spyGetJournaledEdits(spies, 2, startTxid, () -> semaphore.acquireUninterruptibly(2));
}
public static void spyGetJournaledEdits(List<AsyncLogger> spies,
int jnSpyIdx, long fromTxId, Runnable preHook) {
Mockito.doAnswer((Answer<ListenableFuture<GetJournaledEditsResponseProto>>) invocation -> {
preHook.run();
@SuppressWarnings("unchecked")
ListenableFuture<GetJournaledEditsResponseProto> result =
(ListenableFuture<GetJournaledEditsResponseProto>) invocation.callRealMethod();
return result;
}).when(spies.get(jnSpyIdx)).getJournaledEdits(fromTxId,
QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
}
}

View File

@ -22,6 +22,7 @@
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits; import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment; import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeTxns; import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeTxns;
import static org.apache.hadoop.hdfs.qjournal.client.SpyQJournalUtil.spyGetJournaledEdits;
import static org.apache.hadoop.hdfs.qjournal.client.TestQuorumJournalManagerUnit.futureThrows; import static org.apache.hadoop.hdfs.qjournal.client.TestQuorumJournalManagerUnit.futureThrows;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -34,12 +35,10 @@
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -59,7 +58,6 @@
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil; import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector; import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode; import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
@ -68,7 +66,6 @@
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion; import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
@ -1135,9 +1132,9 @@ public void testSelectViaRPCAfterJNJitter() throws Exception {
writeTxns(stm, 21, 20); writeTxns(stm, 21, 20);
Semaphore semaphore = new Semaphore(0); Semaphore semaphore = new Semaphore(0);
spyGetJournaledEdits(0, 21, () -> semaphore.release(1)); spyGetJournaledEdits(spies, 0, 21, () -> semaphore.release(1));
spyGetJournaledEdits(1, 21, () -> semaphore.release(1)); spyGetJournaledEdits(spies, 1, 21, () -> semaphore.release(1));
spyGetJournaledEdits(2, 21, () -> semaphore.acquireUninterruptibly(2)); spyGetJournaledEdits(spies, 2, 21, () -> semaphore.acquireUninterruptibly(2));
List<EditLogInputStream> streams = new ArrayList<>(); List<EditLogInputStream> streams = new ArrayList<>();
qjm.selectInputStreams(streams, 21, true, true); qjm.selectInputStreams(streams, 21, true, true);
@ -1147,17 +1144,6 @@ public void testSelectViaRPCAfterJNJitter() throws Exception {
assertEquals(40, streams.get(0).getLastTxId()); assertEquals(40, streams.get(0).getLastTxId());
} }
private void spyGetJournaledEdits(int jnSpyIdx, long fromTxId, Runnable preHook) {
Mockito.doAnswer((Answer<ListenableFuture<GetJournaledEditsResponseProto>>) invocation -> {
preHook.run();
@SuppressWarnings("unchecked")
ListenableFuture<GetJournaledEditsResponseProto> result =
(ListenableFuture<GetJournaledEditsResponseProto>) invocation.callRealMethod();
return result;
}).when(spies.get(jnSpyIdx)).getJournaledEdits(fromTxId,
QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
}
@Test @Test
public void testSelectViaRpcAfterJNRestart() throws Exception { public void testSelectViaRpcAfterJNRestart() throws Exception {
EditLogOutputStream stm = EditLogOutputStream stm =
@ -1210,27 +1196,10 @@ public void testGetJournalAddressListWithResolution() throws Exception {
// expected // expected
} }
} }
private QuorumJournalManager createSpyingQJM() private QuorumJournalManager createSpyingQJM() throws IOException {
throws IOException, URISyntaxException { return closeLater(SpyQJournalUtil.createSpyingQJM(
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() { conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, null));
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, String nameServiceId, InetSocketAddress addr) {
AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId,
nameServiceId, addr) {
protected ExecutorService createSingleThreadExecutor() {
// Don't parallelize calls to the quorum in the tests.
// This makes the tests more deterministic.
return new DirectExecutorService();
}
};
return Mockito.spy(logger);
}
};
return closeLater(new QuorumJournalManager(
conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, spyFactory));
} }
private static void waitForAllPendingCalls(AsyncLoggerSet als) private static void waitForAllPendingCalls(AsyncLoggerSet als)

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
import org.apache.hadoop.hdfs.qjournal.client.SpyQJournalUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getFileInfo;
import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.spy;
public class TestHAWithInProgressTail {
private MiniQJMHACluster qjmhaCluster;
private MiniDFSCluster cluster;
private MiniJournalCluster jnCluster;
private NameNode nn0;
private NameNode nn1;
@Before
public void startUp() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
conf.setInt(DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY, 500);
HAUtil.setAllowStandbyReads(conf, true);
qjmhaCluster = new MiniQJMHACluster.Builder(conf).build();
cluster = qjmhaCluster.getDfsCluster();
jnCluster = qjmhaCluster.getJournalCluster();
// Get NameNode from cluster to future manual control
nn0 = cluster.getNameNode(0);
nn1 = cluster.getNameNode(1);
}
@After
public void tearDown() throws IOException {
if (qjmhaCluster != null) {
qjmhaCluster.shutdown();
}
}
/**
* Test that Standby Node tails multiple segments while catching up
* during the transition to Active.
*/
@Test
public void testFailoverWithAbnormalJN() throws Exception {
cluster.transitionToActive(0);
cluster.waitActive(0);
// Stop EditlogTailer in Standby NameNode.
cluster.getNameNode(1).getNamesystem().getEditLogTailer().stop();
String p = "/testFailoverWhileTailingWithoutCache/";
nn0.getRpcServer().mkdirs(p + 0, FsPermission.getCachePoolDefault(), true);
cluster.transitionToStandby(0);
spyFSEditLog();
cluster.transitionToActive(1);
// we should read them in nn1.
assertNotNull(getFileInfo(nn1, p + 0, true, false, false));
}
private void spyFSEditLog() throws IOException {
FSEditLog spyEditLog = spy(nn1.getNamesystem().getFSImage().getEditLog());
Mockito.doAnswer(invocation -> {
invocation.callRealMethod();
spyOnJASjournal(spyEditLog.getJournalSet());
return null;
}).when(spyEditLog).recoverUnclosedStreams(anyBoolean());
DFSTestUtil.setEditLogForTesting(nn1.getNamesystem(), spyEditLog);
nn1.getNamesystem().getEditLogTailer().setEditLog(spyEditLog);
}
private void spyOnJASjournal(JournalSet journalSet) throws IOException {
JournalSet.JournalAndStream jas = journalSet.getAllJournalStreams().get(0);
JournalManager oldManager = jas.getManager();
oldManager.close();
// Create a SpyingQJM
QuorumJournalManager manager = SpyQJournalUtil.createSpyingQJM(nn1.getConf(),
jnCluster.getQuorumJournalURI("ns1"),
nn1.getNamesystem().getNamespaceInfo(), "ns1");
manager.recoverUnfinalizedSegments();
jas.setJournalForTests(manager);
SpyQJournalUtil.mockJNWithEmptyOrSlowResponse(manager, 1);
}
}