Revert "HDFS-16689. NameNode may crash when transitioning to Active with in-progress tailer if there are some abnormal JNs. (#4628)" (#4743)
This commit is contained in:
parent
906ae5138e
commit
86cc96c493
@ -53,9 +53,4 @@ public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) {
|
||||
@VisibleForTesting
|
||||
public void mockAnException() {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void mockJNStreams() throws IOException {
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -112,7 +112,6 @@
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotDeletionGc;
|
||||
import org.apache.hadoop.thirdparty.protobuf.ByteString;
|
||||
@ -1390,8 +1389,6 @@ void startActiveServices() throws IOException {
|
||||
editLog.initJournalsForWrite();
|
||||
// May need to recover
|
||||
editLog.recoverUnclosedStreams();
|
||||
|
||||
BlockManagerFaultInjector.getInstance().mockJNStreams();
|
||||
|
||||
LOG.info("Catching up to latest edits from old active before " +
|
||||
"taking over writer role in edits logs");
|
||||
|
@ -283,7 +283,7 @@ public void stop() throws IOException {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public FSEditLog getEditLog() {
|
||||
FSEditLog getEditLog() {
|
||||
return editLog;
|
||||
}
|
||||
|
||||
@ -311,7 +311,7 @@ public Void run() throws Exception {
|
||||
startTime - lastLoadTimeMs);
|
||||
// It is already under the name system lock and the checkpointer
|
||||
// thread is already stopped. No need to acquire any other lock.
|
||||
editsTailed = doTailEdits(false);
|
||||
editsTailed = doTailEdits();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
@ -326,10 +326,6 @@ public Void run() throws Exception {
|
||||
|
||||
@VisibleForTesting
|
||||
public long doTailEdits() throws IOException, InterruptedException {
|
||||
return doTailEdits(true);
|
||||
}
|
||||
|
||||
private long doTailEdits(boolean onlyDurableTxns) throws IOException, InterruptedException {
|
||||
Collection<EditLogInputStream> streams;
|
||||
FSImage image = namesystem.getFSImage();
|
||||
|
||||
@ -338,7 +334,7 @@ private long doTailEdits(boolean onlyDurableTxns) throws IOException, Interrupte
|
||||
long startTime = timer.monotonicNow();
|
||||
try {
|
||||
streams = editLog.selectInputStreams(lastTxnId + 1, 0,
|
||||
null, inProgressOk, onlyDurableTxns);
|
||||
null, inProgressOk, true);
|
||||
} catch (IOException ioe) {
|
||||
// This is acceptable. If we try to tail edits in the middle of an edits
|
||||
// log roll, i.e. the last one has been finalized but the new inprogress
|
||||
|
@ -1,107 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.qjournal.client;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
|
||||
/**
|
||||
* One Util class to mock QJuournals for some UTs not in this package.
|
||||
*/
|
||||
public final class SpyQJournalUtil {
|
||||
|
||||
private SpyQJournalUtil() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Mock a QuorumJournalManager with input uri, nsInfo and namServiceId.
|
||||
* @param conf input configuration.
|
||||
* @param uri input uri.
|
||||
* @param nsInfo input nameservice info.
|
||||
* @param nameServiceId input nameservice Id.
|
||||
* @return one mocked QuorumJournalManager.
|
||||
* @throws IOException throw IOException.
|
||||
*/
|
||||
public static QuorumJournalManager createSpyingQJM(Configuration conf,
|
||||
URI uri, NamespaceInfo nsInfo, String nameServiceId) throws IOException {
|
||||
AsyncLogger.Factory spyFactory = (conf1, nsInfo1, journalId1, nameServiceId1, addr1) -> {
|
||||
AsyncLogger logger = new IPCLoggerChannel(conf1, nsInfo1, journalId1, nameServiceId1, addr1);
|
||||
return Mockito.spy(logger);
|
||||
};
|
||||
return new QuorumJournalManager(conf, uri, nsInfo, nameServiceId, spyFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to mock one abnormal JournalNode with one empty response
|
||||
* for getJournaledEdits rpc with startTxid.
|
||||
* @param manager QuorumJournalmanager.
|
||||
* @param startTxid input StartTxid.
|
||||
*/
|
||||
public static void mockOneJNReturnEmptyResponse(
|
||||
QuorumJournalManager manager, long startTxid, int journalIndex) {
|
||||
List<AsyncLogger> spies = manager.getLoggerSetForTests().getLoggersForTests();
|
||||
|
||||
// Mock JN0 return an empty response.
|
||||
GetJournaledEditsResponseProto responseProto = GetJournaledEditsResponseProto
|
||||
.newBuilder().setTxnCount(journalIndex).build();
|
||||
ListenableFuture<GetJournaledEditsResponseProto> ret = Futures.immediateFuture(responseProto);
|
||||
Mockito.doReturn(ret).when(spies.get(journalIndex))
|
||||
.getJournaledEdits(eq(startTxid), eq(QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT));
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to mock one abnormal JournalNode with slow response for
|
||||
* getJournaledEdits rpc with startTxid.
|
||||
* @param manager input QuormJournalManager.
|
||||
* @param startTxid input start txid.
|
||||
* @param sleepTime sleep time.
|
||||
* @param journalIndex the journal index need to be mocked.
|
||||
*/
|
||||
public static void mockOneJNWithSlowResponse(
|
||||
QuorumJournalManager manager, long startTxid, int sleepTime, int journalIndex) {
|
||||
List<AsyncLogger> spies = manager.getLoggerSetForTests().getLoggersForTests();
|
||||
|
||||
ListeningExecutorService service = MoreExecutors.listeningDecorator(
|
||||
Executors.newSingleThreadExecutor());
|
||||
Mockito.doAnswer(invocation -> service.submit(() -> {
|
||||
Thread.sleep(sleepTime);
|
||||
ListenableFuture<?> future = null;
|
||||
try {
|
||||
future = (ListenableFuture<?>) invocation.callRealMethod();
|
||||
} catch (Throwable e) {
|
||||
fail("getJournaledEdits failed " + e.getMessage());
|
||||
}
|
||||
return future.get();
|
||||
})).when(spies.get(journalIndex))
|
||||
.getJournaledEdits(startTxid, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
|
||||
}
|
||||
}
|
@ -1,142 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
|
||||
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
||||
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
|
||||
import org.apache.hadoop.hdfs.qjournal.client.SpyQJournalUtil;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getFileInfo;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
public class TestHAWithInProgressTail {
|
||||
private MiniQJMHACluster qjmhaCluster;
|
||||
private MiniDFSCluster cluster;
|
||||
private MiniJournalCluster jnCluster;
|
||||
private NameNode nn0;
|
||||
private NameNode nn1;
|
||||
|
||||
@Before
|
||||
public void startUp() throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
|
||||
conf.setInt(DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY, 500);
|
||||
HAUtil.setAllowStandbyReads(conf, true);
|
||||
qjmhaCluster = new MiniQJMHACluster.Builder(conf).build();
|
||||
cluster = qjmhaCluster.getDfsCluster();
|
||||
jnCluster = qjmhaCluster.getJournalCluster();
|
||||
|
||||
// Get NameNode from cluster to future manual control
|
||||
nn0 = cluster.getNameNode(0);
|
||||
nn1 = cluster.getNameNode(1);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
if (qjmhaCluster != null) {
|
||||
qjmhaCluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test that Standby Node tails multiple segments while catching up
|
||||
* during the transition to Active.
|
||||
*/
|
||||
@Test
|
||||
public void testFailoverWithAbnormalJN() throws Exception {
|
||||
cluster.transitionToActive(0);
|
||||
cluster.waitActive(0);
|
||||
|
||||
BlockManagerFaultInjector.instance = new BlockManagerFaultInjector() {
|
||||
@Override
|
||||
public void mockJNStreams() throws IOException {
|
||||
spyOnJASjournal();
|
||||
}
|
||||
};
|
||||
|
||||
// Stop EditlogTailer in Standby NameNode.
|
||||
cluster.getNameNode(1).getNamesystem().getEditLogTailer().stop();
|
||||
|
||||
String p = "/testFailoverWhileTailingWithoutCache/";
|
||||
mkdirs(nn0, p + 0, p + 1, p + 2, p + 3, p + 4);
|
||||
mkdirs(nn0, p + 5, p + 6, p + 7, p + 8, p + 9);
|
||||
mkdirs(nn0, p + 10, p + 11, p + 12, p + 13, p + 14);
|
||||
|
||||
cluster.transitionToStandby(0);
|
||||
|
||||
cluster.transitionToActive(1);
|
||||
|
||||
// we should read them in nn1.
|
||||
waitForFileInfo(nn1, p + 0, p + 1, p + 14);
|
||||
}
|
||||
|
||||
private void spyOnJASjournal() throws IOException {
|
||||
JournalSet.JournalAndStream jas = nn1.getNamesystem().getEditLogTailer()
|
||||
.getEditLog().getJournalSet().getAllJournalStreams().get(0);
|
||||
|
||||
JournalManager oldManager = jas.getManager();
|
||||
oldManager.close();
|
||||
|
||||
// Create a SpyingQJM
|
||||
QuorumJournalManager manager = SpyQJournalUtil.createSpyingQJM(nn1.getConf(),
|
||||
jnCluster.getQuorumJournalURI("ns1"),
|
||||
nn1.getNamesystem().getNamespaceInfo(), "ns1");
|
||||
manager.recoverUnfinalizedSegments();
|
||||
jas.setJournalForTests(manager);
|
||||
|
||||
// First JournalNode with an empty response.
|
||||
SpyQJournalUtil.mockOneJNReturnEmptyResponse(manager, 1L, 0);
|
||||
// Second JournalNode with a slow response.
|
||||
SpyQJournalUtil.mockOneJNWithSlowResponse(manager, 1L, 3000, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the given directories on the provided NameNode.
|
||||
*/
|
||||
private static void mkdirs(NameNode nameNode, String... dirNames)
|
||||
throws Exception {
|
||||
for (String dirName : dirNames) {
|
||||
nameNode.getRpcServer().mkdirs(dirName,
|
||||
FsPermission.createImmutable((short) 0755), true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait up to 1 second until the given NameNode is aware of the existing of
|
||||
* all of the provided fileNames.
|
||||
*/
|
||||
private static void waitForFileInfo(NameNode nn, String... fileNames)
|
||||
throws Exception {
|
||||
for (String fileName : fileNames){
|
||||
assertNotNull(getFileInfo(nn, fileName, true, false, false));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user