HDFS-9787. SNNs stop uploading FSImage to ANN once isPrimaryCheckPointer changed to false. (Contributed by Guocui Mi)
This commit is contained in:
parent
96ea309431
commit
2536ece7b6
@ -935,6 +935,9 @@ Trunk (Unreleased)
|
|||||||
HDFS-9755. Erasure Coding: allow to use multiple EC policies in striping
|
HDFS-9755. Erasure Coding: allow to use multiple EC policies in striping
|
||||||
related tests [Part 2]. (Rui Li via zhz)
|
related tests [Part 2]. (Rui Li via zhz)
|
||||||
|
|
||||||
|
HDFS-9787. SNNs stop uploading FSImage to ANN once isPrimaryCheckPointer
|
||||||
|
changed to false. (Guocui Mi via vinayakumarb)
|
||||||
|
|
||||||
Release 2.9.0 - UNRELEASED
|
Release 2.9.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -65,6 +65,7 @@ public class StandbyCheckpointer {
|
|||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final FSNamesystem namesystem;
|
private final FSNamesystem namesystem;
|
||||||
private long lastCheckpointTime;
|
private long lastCheckpointTime;
|
||||||
|
private long lastUploadTime;
|
||||||
private final CheckpointerThread thread;
|
private final CheckpointerThread thread;
|
||||||
private final ThreadFactory uploadThreadFactory;
|
private final ThreadFactory uploadThreadFactory;
|
||||||
private List<URL> activeNNAddresses;
|
private List<URL> activeNNAddresses;
|
||||||
@ -252,6 +253,7 @@ public TransferFsImage.TransferResult call() throws IOException {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
lastUploadTime = monotonicNow();
|
||||||
|
|
||||||
// we are primary if we successfully updated the ANN
|
// we are primary if we successfully updated the ANN
|
||||||
this.isPrimaryCheckPointer = success;
|
this.isPrimaryCheckPointer = success;
|
||||||
@ -362,6 +364,7 @@ private void doWork() {
|
|||||||
// Reset checkpoint time so that we don't always checkpoint
|
// Reset checkpoint time so that we don't always checkpoint
|
||||||
// on startup.
|
// on startup.
|
||||||
lastCheckpointTime = monotonicNow();
|
lastCheckpointTime = monotonicNow();
|
||||||
|
lastUploadTime = monotonicNow();
|
||||||
while (shouldRun) {
|
while (shouldRun) {
|
||||||
boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage();
|
boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage();
|
||||||
if (!needRollbackCheckpoint) {
|
if (!needRollbackCheckpoint) {
|
||||||
@ -414,7 +417,9 @@ private void doWork() {
|
|||||||
|
|
||||||
// on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a
|
// on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a
|
||||||
// rollback request, are the checkpointer, are outside the quiet period.
|
// rollback request, are the checkpointer, are outside the quiet period.
|
||||||
boolean sendRequest = isPrimaryCheckPointer || secsSinceLast >= checkpointConf.getQuietPeriod();
|
final long secsSinceLastUpload = (now - lastUploadTime) / 1000;
|
||||||
|
boolean sendRequest = isPrimaryCheckPointer
|
||||||
|
|| secsSinceLastUpload >= checkpointConf.getQuietPeriod();
|
||||||
doCheckpoint(sendRequest);
|
doCheckpoint(sendRequest);
|
||||||
|
|
||||||
// reset needRollbackCheckpoint to false only when we finish a ckpt
|
// reset needRollbackCheckpoint to false only when we finish a ckpt
|
||||||
|
@ -458,6 +458,48 @@ public void run() {
|
|||||||
t.join();
|
t.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for the case standby NNs can upload FSImage to ANN after
|
||||||
|
* become non-primary standby NN. HDFS-9787
|
||||||
|
*/
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testNonPrimarySBNUploadFSImage() throws Exception {
|
||||||
|
// Shutdown all standby NNs.
|
||||||
|
for (int i = 1; i < NUM_NNS; i++) {
|
||||||
|
cluster.shutdownNameNode(i);
|
||||||
|
|
||||||
|
// Checkpoint as fast as we can, in a tight loop.
|
||||||
|
cluster.getConfiguration(i).setInt(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
doEdits(0, 10);
|
||||||
|
cluster.transitionToStandby(0);
|
||||||
|
|
||||||
|
// Standby NNs do checkpoint without active NN available.
|
||||||
|
for (int i = 1; i < NUM_NNS; i++) {
|
||||||
|
cluster.restartNameNode(i, false);
|
||||||
|
}
|
||||||
|
cluster.waitClusterUp();
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_NNS; i++) {
|
||||||
|
// Once the standby catches up, it should do a checkpoint
|
||||||
|
// and save to local directories.
|
||||||
|
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
|
||||||
|
}
|
||||||
|
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
|
||||||
|
// Wait for 2 seconds to expire last upload time.
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
doEdits(11, 20);
|
||||||
|
nns[0].getRpcServer().rollEditLog();
|
||||||
|
|
||||||
|
// One of standby NNs should also upload it back to the active.
|
||||||
|
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(23));
|
||||||
|
}
|
||||||
|
|
||||||
private void doEdits(int start, int stop) throws IOException {
|
private void doEdits(int start, int stop) throws IOException {
|
||||||
for (int i = start; i < stop; i++) {
|
for (int i = start; i < stop; i++) {
|
||||||
Path p = new Path("/test" + i);
|
Path p = new Path("/test" + i);
|
||||||
|
Loading…
Reference in New Issue
Block a user