HDFS-16659. JournalNode should throw NewerTxnIdException when SinceTxId is bigger than HighestWrittenTxId (#4560)

Co-authored-by: Zander Xu <zengqiang.xu@shopee.com>
Signed-off-by: Erik Krogen <xkrogen@apache.org>
This commit is contained in:
ZanderXu 2022-09-07 01:12:55 +08:00 committed by GitHub
parent 7bcf853ff4
commit c947c326e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 97 additions and 4 deletions

View File

@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdfs.qjournal.server.NewerTxnIdException;
import org.apache.hadoop.util.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -523,6 +524,9 @@ public void selectInputStreams(Collection<EditLogInputStream> streams,
selectRpcInputStreams(rpcStreams, fromTxnId, onlyDurableTxns);
streams.addAll(rpcStreams);
return;
} catch (NewerTxnIdException ntie) {
// normal situation, we requested newer IDs than any journal has. no new streams
return;
} catch (IOException ioe) {
LOG.warn("Encountered exception while tailing edits >= " + fromTxnId +
" via RPC; falling back to streaming.", ioe);

View File

@ -750,10 +750,13 @@ public GetJournaledEditsResponseProto getJournaledEdits(long sinceTxId,
"is a requirement to fetch journaled edits via RPC. Please enable " +
"it via " + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY);
}
if (sinceTxId > getHighestWrittenTxId()) {
// Requested edits that don't exist yet; short-circuit the cache here
long highestTxId = getHighestWrittenTxId();
if (sinceTxId > highestTxId) {
// Requested edits that don't exist yet and is newer than highestTxId.
metrics.rpcEmptyResponses.incr();
return GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build();
throw new NewerTxnIdException(
"Highest txn ID available in the journal is %d, but requested txns starting at %d.",
highestTxId, sinceTxId);
}
try {
List<ByteBuffer> buffers = new ArrayList<>();

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.qjournal.server;
import java.io.IOException;
/**
* Exception when no edits are available.
*/
public class NewerTxnIdException extends IOException {
private static final long serialVersionUID = 0L;
public NewerTxnIdException(String msgFormat, Object... msgArgs) {
super(String.format(msgFormat, msgArgs));
}
}

View File

@ -28,7 +28,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.eq;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
@ -40,11 +40,15 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.net.MockDomainNameResolver;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.SettableFuture;
import org.apache.hadoop.util.Lists;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@ -53,6 +57,7 @@
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
@ -1101,6 +1106,56 @@ public void testSelectViaRpcTwoJNsError() throws Exception {
}
}
/**
* Test selecting EditLogInputStream after some journalNode jitter.
* Suppose there are 3 journalNodes, JN0 ~ JN2.
* 1. JN0 has some abnormal cases when Active Namenode is syncing 10 Edits with first txid 11.
* 2. NameNode just ignore the abnormal JN0 and continue to sync Edits to Journal 1 and 2.
* 3. JN0 backed to health.
* 4. NameNode continue sync 10 Edits with first txid 21.
* 5. At this point, there are no Edits 11 ~ 30 in the cache of JN0.
* 6. Observer NameNode try to select EditLogInputStream through
* getJournaledEdits with since txId 21.
* 7. JN2 has some abnormal cases and caused a slow response.
*/
@Test
public void testSelectViaRPCAfterJNJitter() throws Exception {
EditLogOutputStream stm = qjm.startLogSegment(
1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
SettableFuture<Void> slowLog = SettableFuture.create();
Mockito.doReturn(slowLog).when(spies.get(0))
.sendEdits(eq(1L), eq(11L), eq(10), Mockito.any());
// Successfully write these edits to JN0 ~ JN2
writeTxns(stm, 1, 10);
// Failed write these edits to JN0, but successfully write them to JN1 ~ JN2
writeTxns(stm, 11, 10);
// Successfully write these edits to JN1 ~ JN2
writeTxns(stm, 21, 20);
Semaphore semaphore = new Semaphore(0);
spyGetJournaledEdits(0, 21, () -> semaphore.release(1));
spyGetJournaledEdits(1, 21, () -> semaphore.release(1));
spyGetJournaledEdits(2, 21, () -> semaphore.acquireUninterruptibly(2));
List<EditLogInputStream> streams = new ArrayList<>();
qjm.selectInputStreams(streams, 21, true, true);
assertEquals(1, streams.size());
assertEquals(21, streams.get(0).getFirstTxId());
assertEquals(40, streams.get(0).getLastTxId());
}
private void spyGetJournaledEdits(int jnSpyIdx, long fromTxId, Runnable preHook) {
Mockito.doAnswer((Answer<ListenableFuture<GetJournaledEditsResponseProto>>) invocation -> {
preHook.run();
@SuppressWarnings("unchecked")
ListenableFuture<GetJournaledEditsResponseProto> result =
(ListenableFuture<GetJournaledEditsResponseProto>) invocation.callRealMethod();
return result;
}).when(spies.get(jnSpyIdx)).getJournaledEdits(fromTxId,
QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
}
@Test
public void testSelectViaRpcAfterJNRestart() throws Exception {
EditLogOutputStream stm =