From 15b52fb6a4c9901ba3491bff6ee43312023c9932 Mon Sep 17 00:00:00 2001 From: ZanderXu Date: Thu, 22 Dec 2022 02:06:01 +0800 Subject: [PATCH] HDFS-16689. Standby NameNode crashes when transitioning to Active with in-progress tailer (#4744) Signed-off-by: Erik Krogen Co-authored-by: zengqiang.xu --- .../hdfs/server/namenode/FSEditLog.java | 19 ++- .../hdfs/server/namenode/FSNamesystem.java | 2 +- .../server/namenode/ha/EditLogTailer.java | 11 +- .../hadoop/hdfs/qjournal/TestNNWithQJM.java | 6 +- .../hdfs/qjournal/client/SpyQJournalUtil.java | 108 ++++++++++++++++ .../client/TestQuorumJournalManager.java | 47 ++----- .../namenode/TestHAWithInProgressTail.java | 121 ++++++++++++++++++ 7 files changed, 264 insertions(+), 50 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/SpyQJournalUtil.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHAWithInProgressTail.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index 5bb6872e58..52ff277345 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -1654,18 +1654,31 @@ synchronized void logEdit(final int length, final byte[] data) { endTransaction(start); } + void recoverUnclosedStreams() throws IOException { + recoverUnclosedStreams(false); + } + /** * Run recovery on all journals to recover any unclosed segments */ - synchronized void recoverUnclosedStreams() { + synchronized void recoverUnclosedStreams(boolean terminateOnFailure) throws IOException { Preconditions.checkState( state == State.BETWEEN_LOG_SEGMENTS, "May not recover segments - wrong state: %s", state); try { journalSet.recoverUnfinalizedSegments(); } catch (IOException ex) { - // All journals have failed, it is handled in logSync. - // TODO: are we sure this is OK? + if (terminateOnFailure) { + final String msg = "Unable to recover log segments: " + + "too few journals successfully recovered."; + LOG.error(msg, ex); + synchronized (journalSetLock) { + IOUtils.cleanupWithLogger(LOG, journalSet); + } + terminate(1, msg); + } else { + throw ex; + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index a26902f5de..5e4f0d520a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1389,7 +1389,7 @@ void startActiveServices() throws IOException { // During startup, we're already open for write during initialization. editLog.initJournalsForWrite(); // May need to recover - editLog.recoverUnclosedStreams(); + editLog.recoverUnclosedStreams(true); LOG.info("Catching up to latest edits from old active before " + "taking over writer role in edits logs"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index 6921e204ae..d43035ba73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -311,7 +311,8 @@ public Void run() throws Exception { startTime - lastLoadTimeMs); // It is already under the name system lock and the checkpointer // thread is already stopped. No need to acquire any other lock. - editsTailed = doTailEdits(); + // HDFS-16689. Disable inProgress to use the streaming mechanism + editsTailed = doTailEdits(false); } catch (InterruptedException e) { throw new IOException(e); } finally { @@ -323,9 +324,13 @@ public Void run() throws Exception { } }); } - + @VisibleForTesting public long doTailEdits() throws IOException, InterruptedException { + return doTailEdits(inProgressOk); + } + + private long doTailEdits(boolean enableInProgress) throws IOException, InterruptedException { Collection streams; FSImage image = namesystem.getFSImage(); @@ -334,7 +339,7 @@ public long doTailEdits() throws IOException, InterruptedException { long startTime = timer.monotonicNow(); try { streams = editLog.selectInputStreams(lastTxnId + 1, 0, - null, inProgressOk, true); + null, enableInProgress, true); } catch (IOException ioe) { // This is acceptable. If we try to tail edits in the middle of an edits // log roll, i.e. the last one has been finalized but the new inprogress diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java index 4483667e31..6a340024c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java @@ -33,7 +33,6 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.ExitUtil; -import org.apache.hadoop.util.ExitUtil.ExitException; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -197,10 +196,9 @@ public void testMismatchedNNIsRejected() throws Exception { .manageNameDfsDirs(false).format(false).checkExitOnShutdown(false) .build(); fail("New NN with different namespace should have been rejected"); - } catch (ExitException ee) { + } catch (IOException ioe) { GenericTestUtils.assertExceptionContains( - "Unable to start log segment 1: too few journals", ee); - assertTrue("Didn't terminate properly ", ExitUtil.terminateCalled()); + "recoverUnfinalizedSegments failed for too many journals", ioe); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/SpyQJournalUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/SpyQJournalUtil.java new file mode 100644 index 0000000000..5816862704 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/SpyQJournalUtil.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; + +/** + * One Util class to mock QJM for some UTs not in this package. + */ +public final class SpyQJournalUtil { + + private SpyQJournalUtil() { + } + + /** + * Mock a QuorumJournalManager with input uri, nsInfo and namServiceId. + * @param conf input configuration. + * @param uri input uri. + * @param nsInfo input nameservice info. + * @param nameServiceId input nameservice Id. + * @return one mocked QuorumJournalManager. + * @throws IOException throw IOException. + */ + public static QuorumJournalManager createSpyingQJM(Configuration conf, + URI uri, NamespaceInfo nsInfo, String nameServiceId) throws IOException { + AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() { + @Override + public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, + String journalId, String nameServiceId, InetSocketAddress addr) { + AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, + nameServiceId, addr) { + protected ExecutorService createSingleThreadExecutor() { + // Don't parallelize calls to the quorum in the tests. + // This makes the tests more deterministic. + return new DirectExecutorService(); + } + }; + return Mockito.spy(logger); + } + }; + return new QuorumJournalManager(conf, uri, nsInfo, nameServiceId, spyFactory); + } + + /** + * Mock Journals with different response for getJournaledEdits rpc with the input startTxid. + * 1. First journal with one empty response. + * 2. Second journal with one normal response. + * 3. Third journal with one slow response. + * @param manager input QuorumJournalManager. + * @param startTxid input start txid. + */ + public static void mockJNWithEmptyOrSlowResponse(QuorumJournalManager manager, long startTxid) { + List spies = manager.getLoggerSetForTests().getLoggersForTests(); + Semaphore semaphore = new Semaphore(0); + + // Mock JN0 return an empty response. + Mockito.doAnswer(invocation -> { + semaphore.release(); + return GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build(); + }).when(spies.get(0)) + .getJournaledEdits(startTxid, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); + + // Mock JN1 return a normal response. + spyGetJournaledEdits(spies, 1, startTxid, () -> semaphore.release(1)); + + // Mock JN2 return a slow response + spyGetJournaledEdits(spies, 2, startTxid, () -> semaphore.acquireUninterruptibly(2)); + } + + public static void spyGetJournaledEdits(List spies, + int jnSpyIdx, long fromTxId, Runnable preHook) { + Mockito.doAnswer((Answer>) invocation -> { + preHook.run(); + @SuppressWarnings("unchecked") + ListenableFuture result = + (ListenableFuture) invocation.callRealMethod(); + return result; + }).when(spies.get(jnSpyIdx)).getJournaledEdits(fromTxId, + QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index e2ee2e365d..0beaca59ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits; import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment; import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeTxns; +import static org.apache.hadoop.hdfs.qjournal.client.SpyQJournalUtil.spyGetJournaledEdits; import static org.apache.hadoop.hdfs.qjournal.client.TestQuorumJournalManagerUnit.futureThrows; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -34,12 +35,10 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; -import java.net.URISyntaxException; import java.net.URL; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -59,7 +58,6 @@ import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; import org.apache.hadoop.hdfs.qjournal.QJMTestUtil; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; -import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector; import org.apache.hadoop.hdfs.qjournal.server.JournalNode; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; @@ -68,7 +66,6 @@ import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion; -import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.test.GenericTestUtils; @@ -1135,9 +1132,9 @@ public void testSelectViaRPCAfterJNJitter() throws Exception { writeTxns(stm, 21, 20); Semaphore semaphore = new Semaphore(0); - spyGetJournaledEdits(0, 21, () -> semaphore.release(1)); - spyGetJournaledEdits(1, 21, () -> semaphore.release(1)); - spyGetJournaledEdits(2, 21, () -> semaphore.acquireUninterruptibly(2)); + spyGetJournaledEdits(spies, 0, 21, () -> semaphore.release(1)); + spyGetJournaledEdits(spies, 1, 21, () -> semaphore.release(1)); + spyGetJournaledEdits(spies, 2, 21, () -> semaphore.acquireUninterruptibly(2)); List streams = new ArrayList<>(); qjm.selectInputStreams(streams, 21, true, true); @@ -1147,17 +1144,6 @@ public void testSelectViaRPCAfterJNJitter() throws Exception { assertEquals(40, streams.get(0).getLastTxId()); } - private void spyGetJournaledEdits(int jnSpyIdx, long fromTxId, Runnable preHook) { - Mockito.doAnswer((Answer>) invocation -> { - preHook.run(); - @SuppressWarnings("unchecked") - ListenableFuture result = - (ListenableFuture) invocation.callRealMethod(); - return result; - }).when(spies.get(jnSpyIdx)).getJournaledEdits(fromTxId, - QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); - } - @Test public void testSelectViaRpcAfterJNRestart() throws Exception { EditLogOutputStream stm = @@ -1210,27 +1196,10 @@ public void testGetJournalAddressListWithResolution() throws Exception { // expected } } - - private QuorumJournalManager createSpyingQJM() - throws IOException, URISyntaxException { - AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() { - @Override - public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, - String journalId, String nameServiceId, InetSocketAddress addr) { - AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, - nameServiceId, addr) { - protected ExecutorService createSingleThreadExecutor() { - // Don't parallelize calls to the quorum in the tests. - // This makes the tests more deterministic. - return new DirectExecutorService(); - } - }; - - return Mockito.spy(logger); - } - }; - return closeLater(new QuorumJournalManager( - conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, spyFactory)); + + private QuorumJournalManager createSpyingQJM() throws IOException { + return closeLater(SpyQJournalUtil.createSpyingQJM( + conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, null)); } private static void waitForAllPendingCalls(AsyncLoggerSet als) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHAWithInProgressTail.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHAWithInProgressTail.java new file mode 100644 index 0000000000..746503d845 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHAWithInProgressTail.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; +import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; +import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager; +import org.apache.hadoop.hdfs.qjournal.client.SpyQJournalUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; + +import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getFileInfo; +import static org.junit.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.spy; + +public class TestHAWithInProgressTail { + private MiniQJMHACluster qjmhaCluster; + private MiniDFSCluster cluster; + private MiniJournalCluster jnCluster; + private NameNode nn0; + private NameNode nn1; + + @Before + public void startUp() throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); + conf.setInt(DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY, 500); + HAUtil.setAllowStandbyReads(conf, true); + qjmhaCluster = new MiniQJMHACluster.Builder(conf).build(); + cluster = qjmhaCluster.getDfsCluster(); + jnCluster = qjmhaCluster.getJournalCluster(); + + // Get NameNode from cluster to future manual control + nn0 = cluster.getNameNode(0); + nn1 = cluster.getNameNode(1); + } + + @After + public void tearDown() throws IOException { + if (qjmhaCluster != null) { + qjmhaCluster.shutdown(); + } + } + + + /** + * Test that Standby Node tails multiple segments while catching up + * during the transition to Active. + */ + @Test + public void testFailoverWithAbnormalJN() throws Exception { + cluster.transitionToActive(0); + cluster.waitActive(0); + + // Stop EditlogTailer in Standby NameNode. + cluster.getNameNode(1).getNamesystem().getEditLogTailer().stop(); + + String p = "/testFailoverWhileTailingWithoutCache/"; + nn0.getRpcServer().mkdirs(p + 0, FsPermission.getCachePoolDefault(), true); + + cluster.transitionToStandby(0); + spyFSEditLog(); + cluster.transitionToActive(1); + + // we should read them in nn1. + assertNotNull(getFileInfo(nn1, p + 0, true, false, false)); + } + + private void spyFSEditLog() throws IOException { + FSEditLog spyEditLog = spy(nn1.getNamesystem().getFSImage().getEditLog()); + Mockito.doAnswer(invocation -> { + invocation.callRealMethod(); + spyOnJASjournal(spyEditLog.getJournalSet()); + return null; + }).when(spyEditLog).recoverUnclosedStreams(anyBoolean()); + + DFSTestUtil.setEditLogForTesting(nn1.getNamesystem(), spyEditLog); + nn1.getNamesystem().getEditLogTailer().setEditLog(spyEditLog); + } + + private void spyOnJASjournal(JournalSet journalSet) throws IOException { + JournalSet.JournalAndStream jas = journalSet.getAllJournalStreams().get(0); + JournalManager oldManager = jas.getManager(); + oldManager.close(); + + // Create a SpyingQJM + QuorumJournalManager manager = SpyQJournalUtil.createSpyingQJM(nn1.getConf(), + jnCluster.getQuorumJournalURI("ns1"), + nn1.getNamesystem().getNamespaceInfo(), "ns1"); + manager.recoverUnfinalizedSegments(); + jas.setJournalForTests(manager); + + SpyQJournalUtil.mockJNWithEmptyOrSlowResponse(manager, 1); + } +}