HDFS-14317. Ensure checkpoints are created when in-progress edit log tailing is enabled with a period shorter than the log roll period. Contributed by Ekanth Sethuramalingam.
This commit is contained in:
parent
475011bbf8
commit
1bc282e0b3
@ -331,7 +331,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
public static final int DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT = 4;
|
||||
|
||||
public static final String DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD = "dfs.namenode.edit.log.autoroll.multiplier.threshold";
|
||||
public static final float DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT = 2.0f;
|
||||
public static final float
|
||||
DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT = 0.5f;
|
||||
public static final String DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS = "dfs.namenode.edit.log.autoroll.check.interval.ms";
|
||||
public static final int DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT = 5*60*1000;
|
||||
|
||||
|
@ -563,7 +563,8 @@ long getLastWrittenTxIdWithoutLock() {
|
||||
/**
|
||||
* @return the first transaction ID in the current log segment
|
||||
*/
|
||||
synchronized long getCurSegmentTxId() {
|
||||
@VisibleForTesting
|
||||
public synchronized long getCurSegmentTxId() {
|
||||
Preconditions.checkState(isSegmentOpen(),
|
||||
"Bad state: %s", state);
|
||||
return curSegmentTxId;
|
||||
|
@ -110,6 +110,11 @@ public class EditLogTailer {
|
||||
*/
|
||||
private long lastLoadTimeMs;
|
||||
|
||||
/**
|
||||
* The last time we triggered a edit log roll on active namenode.
|
||||
*/
|
||||
private long lastRollTimeMs;
|
||||
|
||||
/**
|
||||
* How often the Standby should roll edit logs. Since the Standby only reads
|
||||
* from finalized log segments, the Standby will only be as up-to-date as how
|
||||
@ -140,7 +145,8 @@ public class EditLogTailer {
|
||||
private int nnLoopCount = 0;
|
||||
|
||||
/**
|
||||
* maximum number of retries we should give each of the remote namenodes before giving up
|
||||
* Maximum number of retries we should give each of the remote namenodes
|
||||
* before giving up.
|
||||
*/
|
||||
private int maxRetries;
|
||||
|
||||
@ -166,6 +172,7 @@ public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
|
||||
this.editLog = namesystem.getEditLog();
|
||||
|
||||
lastLoadTimeMs = monotonicNow();
|
||||
lastRollTimeMs = monotonicNow();
|
||||
|
||||
logRollPeriodMs = conf.getTimeDuration(
|
||||
DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
|
||||
@ -354,7 +361,7 @@ public long getLastLoadTimeMs() {
|
||||
*/
|
||||
private boolean tooLongSinceLastLoad() {
|
||||
return logRollPeriodMs >= 0 &&
|
||||
(monotonicNow() - lastLoadTimeMs) > logRollPeriodMs ;
|
||||
(monotonicNow() - lastRollTimeMs) > logRollPeriodMs;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -382,6 +389,7 @@ void triggerActiveLogRoll() {
|
||||
try {
|
||||
future = rollEditsRpcExecutor.submit(getNameNodeProxy());
|
||||
future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS);
|
||||
lastRollTimeMs = monotonicNow();
|
||||
lastRollTriggerTxId = lastLoadedTxnId;
|
||||
} catch (ExecutionException e) {
|
||||
LOG.warn("Unable to trigger a roll of the active NN", e);
|
||||
|
@ -2614,7 +2614,7 @@
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.edit.log.autoroll.multiplier.threshold</name>
|
||||
<value>2.0</value>
|
||||
<value>0.5</value>
|
||||
<description>
|
||||
Determines when an active namenode will roll its own edit log.
|
||||
The actual threshold (in number of edits) is determined by multiplying
|
||||
|
@ -338,9 +338,97 @@ protected Void doWork() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStandbyTriggersLogRollsWhenTailInProgressEdits()
|
||||
throws Exception {
|
||||
// Time in seconds to wait for standby to catch up to edits from active
|
||||
final int standbyCatchupWaitTime = 2;
|
||||
// Time in seconds to wait before checking if edit logs are rolled while
|
||||
// expecting no edit log roll
|
||||
final int noLogRollWaitTime = 2;
|
||||
// Time in seconds to wait before checking if edit logs are rolled while
|
||||
// expecting edit log roll
|
||||
final int logRollWaitTime = 3;
|
||||
|
||||
Configuration conf = getConf();
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
|
||||
standbyCatchupWaitTime + noLogRollWaitTime + 1);
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
|
||||
|
||||
MiniDFSCluster cluster = createMiniDFSCluster(conf, 2);
|
||||
if (cluster == null) {
|
||||
fail("failed to start mini cluster.");
|
||||
}
|
||||
|
||||
try {
|
||||
int activeIndex = new Random().nextBoolean() ? 1 : 0;
|
||||
int standbyIndex = (activeIndex == 0) ? 1 : 0;
|
||||
cluster.transitionToActive(activeIndex);
|
||||
NameNode active = cluster.getNameNode(activeIndex);
|
||||
NameNode standby = cluster.getNameNode(standbyIndex);
|
||||
|
||||
long origTxId = active.getNamesystem().getFSImage().getEditLog()
|
||||
.getCurSegmentTxId();
|
||||
for (int i = 0; i < DIRS_TO_MAKE / 2; i++) {
|
||||
NameNodeAdapter.mkdirs(active, getDirPath(i),
|
||||
new PermissionStatus("test", "test",
|
||||
new FsPermission((short)00755)), true);
|
||||
}
|
||||
|
||||
long activeTxId = active.getNamesystem().getFSImage().getEditLog()
|
||||
.getLastWrittenTxId();
|
||||
waitForStandbyToCatchUpWithInProgressEdits(standby, activeTxId,
|
||||
standbyCatchupWaitTime);
|
||||
|
||||
for (int i = DIRS_TO_MAKE / 2; i < DIRS_TO_MAKE; i++) {
|
||||
NameNodeAdapter.mkdirs(active, getDirPath(i),
|
||||
new PermissionStatus("test", "test",
|
||||
new FsPermission((short)00755)), true);
|
||||
}
|
||||
|
||||
boolean exceptionThrown = false;
|
||||
try {
|
||||
checkForLogRoll(active, origTxId, noLogRollWaitTime);
|
||||
} catch (TimeoutException e) {
|
||||
exceptionThrown = true;
|
||||
}
|
||||
assertTrue(exceptionThrown);
|
||||
|
||||
checkForLogRoll(active, origTxId, logRollWaitTime);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private static void waitForStandbyToCatchUpWithInProgressEdits(
|
||||
final NameNode standby, final long activeTxId,
|
||||
int maxWaitSec) throws Exception {
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
long standbyTxId = standby.getNamesystem().getFSImage()
|
||||
.getLastAppliedTxId();
|
||||
return (standbyTxId >= activeTxId);
|
||||
}
|
||||
}, 100, maxWaitSec * 1000);
|
||||
}
|
||||
|
||||
private static void checkForLogRoll(final NameNode active,
|
||||
final long origTxId, int maxWaitSec) throws Exception {
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
long curSegmentTxId = active.getNamesystem().getFSImage().getEditLog()
|
||||
.getCurSegmentTxId();
|
||||
return (origTxId != curSegmentTxId);
|
||||
}
|
||||
}, 100, maxWaitSec * 1000);
|
||||
}
|
||||
|
||||
private static MiniDFSCluster createMiniDFSCluster(Configuration conf,
|
||||
int nnCount) throws IOException {
|
||||
int basePort = 10060 + new Random().nextInt(100) * 2;
|
||||
int basePort = 10060 + new Random().nextInt(1000) * 2;
|
||||
|
||||
// By passing in basePort, name node will have IPC port set,
|
||||
// which is needed for enabling roll log.
|
||||
|
@ -254,10 +254,10 @@ public void testCheckpointStartingMidEditsFile() throws Exception {
|
||||
|
||||
// Once the standby catches up, it should notice that it needs to
|
||||
// do a checkpoint and save one to its local directories.
|
||||
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 3));
|
||||
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 5));
|
||||
|
||||
// It should also upload it back to the active.
|
||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 3));
|
||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 5));
|
||||
|
||||
causeFailureOnEditLogRead();
|
||||
|
||||
@ -272,15 +272,15 @@ public void testCheckpointStartingMidEditsFile() throws Exception {
|
||||
}
|
||||
|
||||
// 5 because we should get OP_START_LOG_SEGMENT and one successful OP_MKDIR
|
||||
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 3, 5));
|
||||
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 5, 7));
|
||||
|
||||
// It should also upload it back to the active.
|
||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 3, 5));
|
||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 5, 7));
|
||||
|
||||
// Restart the active NN
|
||||
cluster.restartNameNode(0);
|
||||
|
||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 3, 5));
|
||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 5, 7));
|
||||
|
||||
FileSystem fs0 = null;
|
||||
try {
|
||||
@ -309,7 +309,7 @@ public void testFailureToReadEditsOnTransitionToActive() throws Exception {
|
||||
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
|
||||
|
||||
// It should also upload it back to the active.
|
||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 3));
|
||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 5));
|
||||
|
||||
causeFailureOnEditLogRead();
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user