HDFS-4128. 2NN gets stuck in inconsistent state if edit log replay fails in the middle (kihwal via daryn)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1452384 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ec13f1eb3a
commit
cfa86e6110
@ -2317,6 +2317,9 @@ Release 0.23.7 - UNRELEASED
|
|||||||
HDFS-4495. Allow client-side lease renewal to be retried beyond soft-limit
|
HDFS-4495. Allow client-side lease renewal to be retried beyond soft-limit
|
||||||
(kihwal)
|
(kihwal)
|
||||||
|
|
||||||
|
HDFS-4128. 2NN gets stuck in inconsistent state if edit log replay fails
|
||||||
|
in the middle (kihwal via daryn)
|
||||||
|
|
||||||
Release 0.23.6 - UNRELEASED
|
Release 0.23.6 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -121,6 +121,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final long DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT = 3600;
|
public static final long DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT = 3600;
|
||||||
public static final String DFS_NAMENODE_CHECKPOINT_TXNS_KEY = "dfs.namenode.checkpoint.txns";
|
public static final String DFS_NAMENODE_CHECKPOINT_TXNS_KEY = "dfs.namenode.checkpoint.txns";
|
||||||
public static final long DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT = 40000;
|
public static final long DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT = 40000;
|
||||||
|
public static final String DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_KEY = "dfs.namenode.checkpoint.max-retries";
|
||||||
|
public static final int DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_DEFAULT = 3;
|
||||||
public static final String DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY = "dfs.namenode.heartbeat.recheck-interval";
|
public static final String DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY = "dfs.namenode.heartbeat.recheck-interval";
|
||||||
public static final int DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT = 5*60*1000;
|
public static final int DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT = 5*60*1000;
|
||||||
public static final String DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY = "dfs.namenode.tolerate.heartbeat.multiplier";
|
public static final String DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY = "dfs.namenode.tolerate.heartbeat.multiplier";
|
||||||
|
@ -39,6 +39,8 @@ public class CheckpointConf {
|
|||||||
/** checkpoint once every this many transactions, regardless of time */
|
/** checkpoint once every this many transactions, regardless of time */
|
||||||
private final long checkpointTxnCount;
|
private final long checkpointTxnCount;
|
||||||
|
|
||||||
|
/** maxium number of retries when merge errors occur */
|
||||||
|
private final int maxRetriesOnMergeError;
|
||||||
|
|
||||||
public CheckpointConf(Configuration conf) {
|
public CheckpointConf(Configuration conf) {
|
||||||
checkpointCheckPeriod = conf.getLong(
|
checkpointCheckPeriod = conf.getLong(
|
||||||
@ -49,6 +51,8 @@ public CheckpointConf(Configuration conf) {
|
|||||||
DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT);
|
DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT);
|
||||||
checkpointTxnCount = conf.getLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
|
checkpointTxnCount = conf.getLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
|
||||||
DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
|
DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
|
||||||
|
maxRetriesOnMergeError = conf.getInt(DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_KEY,
|
||||||
|
DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_DEFAULT);
|
||||||
warnForDeprecatedConfigs(conf);
|
warnForDeprecatedConfigs(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,4 +79,8 @@ public long getCheckPeriod() {
|
|||||||
public long getTxnCount() {
|
public long getTxnCount() {
|
||||||
return checkpointTxnCount;
|
return checkpointTxnCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getMaxRetriesOnMergeError() {
|
||||||
|
return maxRetriesOnMergeError;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,7 @@ static CheckpointFaultInjector getInstance() {
|
|||||||
|
|
||||||
public void beforeGetImageSetsHeaders() throws IOException {}
|
public void beforeGetImageSetsHeaders() throws IOException {}
|
||||||
public void afterSecondaryCallsRollEditLog() throws IOException {}
|
public void afterSecondaryCallsRollEditLog() throws IOException {}
|
||||||
|
public void duringMerge() throws IOException {}
|
||||||
public void afterSecondaryUploadsNewImage() throws IOException {}
|
public void afterSecondaryUploadsNewImage() throws IOException {}
|
||||||
public void aboutToSendFile(File localfile) throws IOException {}
|
public void aboutToSendFile(File localfile) throws IOException {}
|
||||||
|
|
||||||
|
@ -144,6 +144,11 @@ FSImage getFSImage() {
|
|||||||
return checkpointImage;
|
return checkpointImage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int getMergeErrorCount() {
|
||||||
|
return checkpointImage.getMergeErrorCount();
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
FSNamesystem getFSNamesystem() {
|
FSNamesystem getFSNamesystem() {
|
||||||
return namesystem;
|
return namesystem;
|
||||||
@ -339,6 +344,7 @@ public void doWork() {
|
|||||||
// number of transactions in the edit log that haven't yet been checkpointed.
|
// number of transactions in the edit log that haven't yet been checkpointed.
|
||||||
//
|
//
|
||||||
long period = checkpointConf.getCheckPeriod();
|
long period = checkpointConf.getCheckPeriod();
|
||||||
|
int maxRetries = checkpointConf.getMaxRetriesOnMergeError();
|
||||||
|
|
||||||
while (shouldRun) {
|
while (shouldRun) {
|
||||||
try {
|
try {
|
||||||
@ -364,6 +370,13 @@ public void doWork() {
|
|||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Exception in doCheckpoint", e);
|
LOG.error("Exception in doCheckpoint", e);
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
|
// Prevent a huge number of edits from being created due to
|
||||||
|
// unrecoverable conditions and endless retries.
|
||||||
|
if (checkpointImage.getMergeErrorCount() > maxRetries) {
|
||||||
|
LOG.fatal("Merging failed " +
|
||||||
|
checkpointImage.getMergeErrorCount() + " times.");
|
||||||
|
terminate(1);
|
||||||
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
LOG.fatal("Throwable Exception in doCheckpoint", e);
|
LOG.fatal("Throwable Exception in doCheckpoint", e);
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
@ -498,9 +511,21 @@ boolean doCheckpoint() throws IOException {
|
|||||||
RemoteEditLogManifest manifest =
|
RemoteEditLogManifest manifest =
|
||||||
namenode.getEditLogManifest(sig.mostRecentCheckpointTxId + 1);
|
namenode.getEditLogManifest(sig.mostRecentCheckpointTxId + 1);
|
||||||
|
|
||||||
|
// Fetch fsimage and edits. Reload the image if previous merge failed.
|
||||||
loadImage |= downloadCheckpointFiles(
|
loadImage |= downloadCheckpointFiles(
|
||||||
fsName, checkpointImage, sig, manifest); // Fetch fsimage and edits
|
fsName, checkpointImage, sig, manifest) |
|
||||||
doMerge(sig, manifest, loadImage, checkpointImage, namesystem);
|
checkpointImage.hasMergeError();
|
||||||
|
try {
|
||||||
|
doMerge(sig, manifest, loadImage, checkpointImage, namesystem);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// A merge error occurred. The in-memory file system state may be
|
||||||
|
// inconsistent, so the image and edits need to be reloaded.
|
||||||
|
checkpointImage.setMergeError();
|
||||||
|
throw ioe;
|
||||||
|
}
|
||||||
|
// Clear any error since merge was successful.
|
||||||
|
checkpointImage.clearMergeError();
|
||||||
|
|
||||||
|
|
||||||
//
|
//
|
||||||
// Upload the new image into the NameNode. Then tell the Namenode
|
// Upload the new image into the NameNode. Then tell the Namenode
|
||||||
@ -754,6 +779,7 @@ private static CommandLineOpts parseArgs(String[] argv) {
|
|||||||
|
|
||||||
static class CheckpointStorage extends FSImage {
|
static class CheckpointStorage extends FSImage {
|
||||||
|
|
||||||
|
private int mergeErrorCount;
|
||||||
private static class CheckpointLogPurger implements LogsPurgeable {
|
private static class CheckpointLogPurger implements LogsPurgeable {
|
||||||
|
|
||||||
private NNStorage storage;
|
private NNStorage storage;
|
||||||
@ -815,6 +841,7 @@ public void selectInputStreams(Collection<EditLogInputStream> streams,
|
|||||||
// we shouldn't have any editLog instance. Setting to null
|
// we shouldn't have any editLog instance. Setting to null
|
||||||
// makes sure we don't accidentally depend on it.
|
// makes sure we don't accidentally depend on it.
|
||||||
editLog = null;
|
editLog = null;
|
||||||
|
mergeErrorCount = 0;
|
||||||
|
|
||||||
// Replace the archival manager with one that can actually work on the
|
// Replace the archival manager with one that can actually work on the
|
||||||
// 2NN's edits storage.
|
// 2NN's edits storage.
|
||||||
@ -881,7 +908,24 @@ void recoverCreate(boolean format) throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
boolean hasMergeError() {
|
||||||
|
return (mergeErrorCount > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
int getMergeErrorCount() {
|
||||||
|
return mergeErrorCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setMergeError() {
|
||||||
|
mergeErrorCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
void clearMergeError() {
|
||||||
|
mergeErrorCount = 0;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ensure that the current/ directory exists in all storage
|
* Ensure that the current/ directory exists in all storage
|
||||||
* directories
|
* directories
|
||||||
@ -915,7 +959,9 @@ static void doMerge(
|
|||||||
dstImage.reloadFromImageFile(file, dstNamesystem);
|
dstImage.reloadFromImageFile(file, dstNamesystem);
|
||||||
dstNamesystem.dir.imageLoadComplete();
|
dstNamesystem.dir.imageLoadComplete();
|
||||||
}
|
}
|
||||||
|
// error simulation code for junit test
|
||||||
|
CheckpointFaultInjector.getInstance().duringMerge();
|
||||||
|
|
||||||
Checkpointer.rollForwardByApplyingLogs(manifest, dstImage, dstNamesystem);
|
Checkpointer.rollForwardByApplyingLogs(manifest, dstImage, dstNamesystem);
|
||||||
// The following has the side effect of purging old fsimages/edit logs.
|
// The following has the side effect of purging old fsimages/edit logs.
|
||||||
dstImage.saveFSImageInAllDirs(dstNamesystem, dstImage.getLastAppliedTxId());
|
dstImage.saveFSImageInAllDirs(dstNamesystem, dstImage.getLastAppliedTxId());
|
||||||
|
@ -640,6 +640,15 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.namenode.checkpoint.max-retries</name>
|
||||||
|
<value>3</value>
|
||||||
|
<description>The SecondaryNameNode retries failed checkpointing. If the
|
||||||
|
failure occurs while loading fsimage or replaying edits, the number of
|
||||||
|
retries is limited by this variable.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.namenode.num.checkpoints.retained</name>
|
<name>dfs.namenode.num.checkpoints.retained</name>
|
||||||
<value>2</value>
|
<value>2</value>
|
||||||
|
@ -74,6 +74,8 @@
|
|||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
|
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
|
||||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||||
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
|
import org.apache.hadoop.util.ExitUtil.ExitException;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
@ -226,6 +228,111 @@ public void testWriteTransactionIdHandlesIOE() throws Exception {
|
|||||||
toString().indexOf("storageDirToCheck") != -1);
|
toString().indexOf("storageDirToCheck") != -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Simulate exception during edit replay.
|
||||||
|
*/
|
||||||
|
@Test(timeout=5000)
|
||||||
|
public void testReloadOnEditReplayFailure () throws IOException {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
FSDataOutputStream fos = null;
|
||||||
|
SecondaryNameNode secondary = null;
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
FileSystem fs = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
|
||||||
|
.build();
|
||||||
|
cluster.waitActive();
|
||||||
|
fs = cluster.getFileSystem();
|
||||||
|
secondary = startSecondaryNameNode(conf);
|
||||||
|
fos = fs.create(new Path("tmpfile0"));
|
||||||
|
fos.write(new byte[] { 0, 1, 2, 3 });
|
||||||
|
secondary.doCheckpoint();
|
||||||
|
fos.write(new byte[] { 0, 1, 2, 3 });
|
||||||
|
fos.hsync();
|
||||||
|
|
||||||
|
// Cause merge to fail in next checkpoint.
|
||||||
|
Mockito.doThrow(new IOException(
|
||||||
|
"Injecting failure during merge"))
|
||||||
|
.when(faultInjector).duringMerge();
|
||||||
|
|
||||||
|
try {
|
||||||
|
secondary.doCheckpoint();
|
||||||
|
fail("Fault injection failed.");
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// This is expected.
|
||||||
|
}
|
||||||
|
Mockito.reset(faultInjector);
|
||||||
|
|
||||||
|
// The error must be recorded, so next checkpoint will reload image.
|
||||||
|
fos.write(new byte[] { 0, 1, 2, 3 });
|
||||||
|
fos.hsync();
|
||||||
|
|
||||||
|
assertTrue("Another checkpoint should have reloaded image",
|
||||||
|
secondary.doCheckpoint());
|
||||||
|
} finally {
|
||||||
|
if (secondary != null) {
|
||||||
|
secondary.shutdown();
|
||||||
|
}
|
||||||
|
if (fs != null) {
|
||||||
|
fs.close();
|
||||||
|
}
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
Mockito.reset(faultInjector);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Simulate 2NN exit due to too many merge failures.
|
||||||
|
*/
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testTooManyEditReplayFailures() throws IOException {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_KEY, "1");
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, "1");
|
||||||
|
|
||||||
|
FSDataOutputStream fos = null;
|
||||||
|
SecondaryNameNode secondary = null;
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
FileSystem fs = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
|
||||||
|
.checkExitOnShutdown(false).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
fs = cluster.getFileSystem();
|
||||||
|
fos = fs.create(new Path("tmpfile0"));
|
||||||
|
fos.write(new byte[] { 0, 1, 2, 3 });
|
||||||
|
|
||||||
|
// Cause merge to fail in next checkpoint.
|
||||||
|
Mockito.doThrow(new IOException(
|
||||||
|
"Injecting failure during merge"))
|
||||||
|
.when(faultInjector).duringMerge();
|
||||||
|
|
||||||
|
secondary = startSecondaryNameNode(conf);
|
||||||
|
secondary.doWork();
|
||||||
|
// Fail if we get here.
|
||||||
|
fail("2NN did not exit.");
|
||||||
|
} catch (ExitException ee) {
|
||||||
|
// ignore
|
||||||
|
ExitUtil.resetFirstExitException();
|
||||||
|
assertEquals("Max retries", 1, secondary.getMergeErrorCount() - 1);
|
||||||
|
} finally {
|
||||||
|
if (secondary != null) {
|
||||||
|
secondary.shutdown();
|
||||||
|
}
|
||||||
|
if (fs != null) {
|
||||||
|
fs.close();
|
||||||
|
}
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
Mockito.reset(faultInjector);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Simulate namenode crashing after rolling edit log.
|
* Simulate namenode crashing after rolling edit log.
|
||||||
*/
|
*/
|
||||||
|
Loading…
Reference in New Issue
Block a user