HDFS-4238. Standby namenode should not do purging of shared storage edits. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1417651 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bd239a8d97
commit
adb8941cc2
@ -561,6 +561,9 @@ Release 2.0.3-alpha - Unreleased
|
|||||||
children of the child have to be updated to the new child. (Jing Zhao
|
children of the child have to be updated to the new child. (Jing Zhao
|
||||||
via szetszwo)
|
via szetszwo)
|
||||||
|
|
||||||
|
HDFS-4238. Standby namenode should not do purging of shared
|
||||||
|
storage edits. (todd)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-3077 SUBTASKS
|
BREAKDOWN OF HDFS-3077 SUBTASKS
|
||||||
|
|
||||||
HDFS-3077. Quorum-based protocol for reading and writing edit logs.
|
HDFS-3077. Quorum-based protocol for reading and writing edit logs.
|
||||||
|
@ -878,6 +878,11 @@ synchronized public JournalSet getJournalSet() {
|
|||||||
return journalSet;
|
return journalSet;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
synchronized void setJournalSetForTesting(JournalSet js) {
|
||||||
|
this.journalSet = js;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used only by tests.
|
* Used only by tests.
|
||||||
*/
|
*/
|
||||||
@ -1031,9 +1036,18 @@ synchronized void abortCurrentLogSegment() {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Archive any log files that are older than the given txid.
|
* Archive any log files that are older than the given txid.
|
||||||
|
*
|
||||||
|
* If the edit log is not open for write, then this call returns with no
|
||||||
|
* effect.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized void purgeLogsOlderThan(final long minTxIdToKeep) {
|
public synchronized void purgeLogsOlderThan(final long minTxIdToKeep) {
|
||||||
|
// Should not purge logs unless they are open for write.
|
||||||
|
// This prevents the SBN from purging logs on shared storage, for example.
|
||||||
|
if (!isOpenForWrite()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
assert curSegmentTxId == HdfsConstants.INVALID_TXID || // on format this is no-op
|
assert curSegmentTxId == HdfsConstants.INVALID_TXID || // on format this is no-op
|
||||||
minTxIdToKeep <= curSegmentTxId :
|
minTxIdToKeep <= curSegmentTxId :
|
||||||
"cannot purge logs older than txid " + minTxIdToKeep +
|
"cannot purge logs older than txid " + minTxIdToKeep +
|
||||||
|
@ -179,6 +179,13 @@ public static FSImage spyOnFsImage(NameNode nn1) {
|
|||||||
return spy;
|
return spy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static JournalSet spyOnJournalSet(NameNode nn) {
|
||||||
|
FSEditLog editLog = nn.getFSImage().getEditLog();
|
||||||
|
JournalSet js = Mockito.spy(editLog.getJournalSet());
|
||||||
|
editLog.setJournalSetForTesting(js);
|
||||||
|
return js;
|
||||||
|
}
|
||||||
|
|
||||||
public static String getMkdirOpPath(FSEditLogOp op) {
|
public static String getMkdirOpPath(FSEditLogOp op) {
|
||||||
if (op.opCode == FSEditLogOpCodes.OP_MKDIR) {
|
if (op.opCode == FSEditLogOpCodes.OP_MKDIR) {
|
||||||
return ((MkdirOp) op).path;
|
return ((MkdirOp) op).path;
|
||||||
|
@ -35,6 +35,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.JournalSet;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
@ -66,6 +67,12 @@ public void setupCluster() throws Exception {
|
|||||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
|
||||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
|
||||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||||
|
|
||||||
|
// Dial down the retention of extra edits and checkpoints. This is to
|
||||||
|
// help catch regressions of HDFS-4238 (SBN should not purge shared edits)
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY, 1);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY, 0);
|
||||||
|
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
|
||||||
conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
|
conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
|
||||||
SlowCodec.class.getCanonicalName());
|
SlowCodec.class.getCanonicalName());
|
||||||
@ -99,15 +106,20 @@ public void shutdownCluster() throws IOException {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSBNCheckpoints() throws Exception {
|
public void testSBNCheckpoints() throws Exception {
|
||||||
doEdits(0, 10);
|
JournalSet standbyJournalSet = NameNodeAdapter.spyOnJournalSet(nn1);
|
||||||
|
|
||||||
|
doEdits(0, 10);
|
||||||
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
|
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
|
||||||
// Once the standby catches up, it should notice that it needs to
|
// Once the standby catches up, it should notice that it needs to
|
||||||
// do a checkpoint and save one to its local directories.
|
// do a checkpoint and save one to its local directories.
|
||||||
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 12));
|
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
|
||||||
|
|
||||||
// It should also upload it back to the active.
|
// It should also upload it back to the active.
|
||||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 12));
|
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12));
|
||||||
|
|
||||||
|
// The standby should never try to purge edit logs on shared storage.
|
||||||
|
Mockito.verify(standbyJournalSet, Mockito.never()).
|
||||||
|
purgeLogsOlderThan(Mockito.anyLong());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -129,8 +141,8 @@ public void testBothNodesInStandbyState() throws Exception {
|
|||||||
// so the standby will catch up. Then, both will be in standby mode
|
// so the standby will catch up. Then, both will be in standby mode
|
||||||
// with enough uncheckpointed txns to cause a checkpoint, and they
|
// with enough uncheckpointed txns to cause a checkpoint, and they
|
||||||
// will each try to take a checkpoint and upload to each other.
|
// will each try to take a checkpoint and upload to each other.
|
||||||
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 12));
|
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
|
||||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 12));
|
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12));
|
||||||
|
|
||||||
assertEquals(12, nn0.getNamesystem().getFSImage()
|
assertEquals(12, nn0.getNamesystem().getFSImage()
|
||||||
.getMostRecentCheckpointTxId());
|
.getMostRecentCheckpointTxId());
|
||||||
|
Loading…
Reference in New Issue
Block a user