diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerFaultInjector.java index 6182477e1f..652cb0439a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerFaultInjector.java @@ -53,9 +53,4 @@ public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) { @VisibleForTesting public void mockAnException() { } - - @VisibleForTesting - public void mockJNStreams() throws IOException { - - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 35ef57caf3..347fec8586 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -112,7 +112,6 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector; import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotDeletionGc; import org.apache.hadoop.thirdparty.protobuf.ByteString; @@ -1390,8 +1389,6 @@ void startActiveServices() throws IOException { editLog.initJournalsForWrite(); // May need to recover editLog.recoverUnclosedStreams(); - - BlockManagerFaultInjector.getInstance().mockJNStreams(); LOG.info("Catching up to latest edits from old active before " + "taking over writer role in edits logs"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index 3c8cfe4413..f72ec7c917 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -283,7 +283,7 @@ public void stop() throws IOException { } @VisibleForTesting - public FSEditLog getEditLog() { + FSEditLog getEditLog() { return editLog; } @@ -311,7 +311,7 @@ public Void run() throws Exception { startTime - lastLoadTimeMs); // It is already under the name system lock and the checkpointer // thread is already stopped. No need to acquire any other lock. - editsTailed = doTailEdits(false); + editsTailed = doTailEdits(); } catch (InterruptedException e) { throw new IOException(e); } finally { @@ -326,10 +326,6 @@ public Void run() throws Exception { @VisibleForTesting public long doTailEdits() throws IOException, InterruptedException { - return doTailEdits(true); - } - - private long doTailEdits(boolean onlyDurableTxns) throws IOException, InterruptedException { Collection streams; FSImage image = namesystem.getFSImage(); @@ -338,7 +334,7 @@ private long doTailEdits(boolean onlyDurableTxns) throws IOException, Interrupte long startTime = timer.monotonicNow(); try { streams = editLog.selectInputStreams(lastTxnId + 1, 0, - null, inProgressOk, onlyDurableTxns); + null, inProgressOk, true); } catch (IOException ioe) { // This is acceptable. If we try to tail edits in the middle of an edits // log roll, i.e. the last one has been finalized but the new inprogress diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/SpyQJournalUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/SpyQJournalUtil.java deleted file mode 100644 index 336aa8ece0..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/SpyQJournalUtil.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.qjournal.client; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; -import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; -import org.mockito.Mockito; - -import java.io.IOException; -import java.net.URI; -import java.util.List; -import java.util.concurrent.Executors; - -import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.eq; - -/** - * One Util class to mock QJuournals for some UTs not in this package. - */ -public final class SpyQJournalUtil { - - private SpyQJournalUtil() { - } - - /** - * Mock a QuorumJournalManager with input uri, nsInfo and namServiceId. - * @param conf input configuration. - * @param uri input uri. - * @param nsInfo input nameservice info. - * @param nameServiceId input nameservice Id. - * @return one mocked QuorumJournalManager. - * @throws IOException throw IOException. - */ - public static QuorumJournalManager createSpyingQJM(Configuration conf, - URI uri, NamespaceInfo nsInfo, String nameServiceId) throws IOException { - AsyncLogger.Factory spyFactory = (conf1, nsInfo1, journalId1, nameServiceId1, addr1) -> { - AsyncLogger logger = new IPCLoggerChannel(conf1, nsInfo1, journalId1, nameServiceId1, addr1); - return Mockito.spy(logger); - }; - return new QuorumJournalManager(conf, uri, nsInfo, nameServiceId, spyFactory); - } - - /** - * Try to mock one abnormal JournalNode with one empty response - * for getJournaledEdits rpc with startTxid. - * @param manager QuorumJournalmanager. - * @param startTxid input StartTxid. - */ - public static void mockOneJNReturnEmptyResponse( - QuorumJournalManager manager, long startTxid, int journalIndex) { - List spies = manager.getLoggerSetForTests().getLoggersForTests(); - - // Mock JN0 return an empty response. - GetJournaledEditsResponseProto responseProto = GetJournaledEditsResponseProto - .newBuilder().setTxnCount(journalIndex).build(); - ListenableFuture ret = Futures.immediateFuture(responseProto); - Mockito.doReturn(ret).when(spies.get(journalIndex)) - .getJournaledEdits(eq(startTxid), eq(QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT)); - } - - /** - * Try to mock one abnormal JournalNode with slow response for - * getJournaledEdits rpc with startTxid. - * @param manager input QuormJournalManager. - * @param startTxid input start txid. - * @param sleepTime sleep time. - * @param journalIndex the journal index need to be mocked. - */ - public static void mockOneJNWithSlowResponse( - QuorumJournalManager manager, long startTxid, int sleepTime, int journalIndex) { - List spies = manager.getLoggerSetForTests().getLoggersForTests(); - - ListeningExecutorService service = MoreExecutors.listeningDecorator( - Executors.newSingleThreadExecutor()); - Mockito.doAnswer(invocation -> service.submit(() -> { - Thread.sleep(sleepTime); - ListenableFuture future = null; - try { - future = (ListenableFuture) invocation.callRealMethod(); - } catch (Throwable e) { - fail("getJournaledEdits failed " + e.getMessage()); - } - return future.get(); - })).when(spies.get(journalIndex)) - .getJournaledEdits(startTxid, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHAWithInProgressTail.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHAWithInProgressTail.java deleted file mode 100644 index c19a45967d..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHAWithInProgressTail.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.HAUtil; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; -import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; -import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager; -import org.apache.hadoop.hdfs.qjournal.client.SpyQJournalUtil; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; - -import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getFileInfo; -import static org.junit.Assert.assertNotNull; - -public class TestHAWithInProgressTail { - private MiniQJMHACluster qjmhaCluster; - private MiniDFSCluster cluster; - private MiniJournalCluster jnCluster; - private NameNode nn0; - private NameNode nn1; - - @Before - public void startUp() throws IOException { - Configuration conf = new Configuration(); - conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); - conf.setInt(DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY, 500); - HAUtil.setAllowStandbyReads(conf, true); - qjmhaCluster = new MiniQJMHACluster.Builder(conf).build(); - cluster = qjmhaCluster.getDfsCluster(); - jnCluster = qjmhaCluster.getJournalCluster(); - - // Get NameNode from cluster to future manual control - nn0 = cluster.getNameNode(0); - nn1 = cluster.getNameNode(1); - } - - @After - public void tearDown() throws IOException { - if (qjmhaCluster != null) { - qjmhaCluster.shutdown(); - } - } - - - /** - * Test that Standby Node tails multiple segments while catching up - * during the transition to Active. - */ - @Test - public void testFailoverWithAbnormalJN() throws Exception { - cluster.transitionToActive(0); - cluster.waitActive(0); - - BlockManagerFaultInjector.instance = new BlockManagerFaultInjector() { - @Override - public void mockJNStreams() throws IOException { - spyOnJASjournal(); - } - }; - - // Stop EditlogTailer in Standby NameNode. - cluster.getNameNode(1).getNamesystem().getEditLogTailer().stop(); - - String p = "/testFailoverWhileTailingWithoutCache/"; - mkdirs(nn0, p + 0, p + 1, p + 2, p + 3, p + 4); - mkdirs(nn0, p + 5, p + 6, p + 7, p + 8, p + 9); - mkdirs(nn0, p + 10, p + 11, p + 12, p + 13, p + 14); - - cluster.transitionToStandby(0); - - cluster.transitionToActive(1); - - // we should read them in nn1. - waitForFileInfo(nn1, p + 0, p + 1, p + 14); - } - - private void spyOnJASjournal() throws IOException { - JournalSet.JournalAndStream jas = nn1.getNamesystem().getEditLogTailer() - .getEditLog().getJournalSet().getAllJournalStreams().get(0); - - JournalManager oldManager = jas.getManager(); - oldManager.close(); - - // Create a SpyingQJM - QuorumJournalManager manager = SpyQJournalUtil.createSpyingQJM(nn1.getConf(), - jnCluster.getQuorumJournalURI("ns1"), - nn1.getNamesystem().getNamespaceInfo(), "ns1"); - manager.recoverUnfinalizedSegments(); - jas.setJournalForTests(manager); - - // First JournalNode with an empty response. - SpyQJournalUtil.mockOneJNReturnEmptyResponse(manager, 1L, 0); - // Second JournalNode with a slow response. - SpyQJournalUtil.mockOneJNWithSlowResponse(manager, 1L, 3000, 1); - } - - /** - * Create the given directories on the provided NameNode. - */ - private static void mkdirs(NameNode nameNode, String... dirNames) - throws Exception { - for (String dirName : dirNames) { - nameNode.getRpcServer().mkdirs(dirName, - FsPermission.createImmutable((short) 0755), true); - } - } - - /** - * Wait up to 1 second until the given NameNode is aware of the existing of - * all of the provided fileNames. - */ - private static void waitForFileInfo(NameNode nn, String... fileNames) - throws Exception { - for (String fileName : fileNames){ - assertNotNull(getFileInfo(nn, fileName, true, false, false)); - } - } -}