HDFS-2824. Fix failover when prior NN died just after creating an edit log segment. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1238069 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-01-30 23:05:18 +00:00
parent 6884348444
commit 641f79a325
12 changed files with 207 additions and 73 deletions

View File

@ -139,3 +139,5 @@ HDFS-2805. Add a test for a federated cluster with HA NNs. (Brandon Li via jiten
HDFS-2841. HAAdmin does not work if security is enabled. (atm)
HDFS-2691. Fixes for pipeline recovery in an HA cluster: report RBW replicas immediately upon pipeline creation. (todd)
HDFS-2824. Fix failover when prior NN died just after creating an edit log segment. (atm via todd)

View File

@ -168,11 +168,11 @@ static FSEditLogLoader.EditLogValidation validateEditLog(File file) throws IOExc
try {
in = new EditLogFileInputStream(file);
} catch (LogHeaderCorruptException corrupt) {
// If it's missing its header, this is equivalent to no transactions
// If the header is malformed or the wrong value, this indicates a corruption
FSImage.LOG.warn("Log at " + file + " has no valid header",
corrupt);
return new FSEditLogLoader.EditLogValidation(0, HdfsConstants.INVALID_TXID,
HdfsConstants.INVALID_TXID);
return new FSEditLogLoader.EditLogValidation(0,
HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID, true);
}
try {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@ -27,6 +28,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.io.IOUtils;
@ -36,7 +38,8 @@
* An implementation of the abstract class {@link EditLogOutputStream}, which
* stores edits in a local file.
*/
class EditLogFileOutputStream extends EditLogOutputStream {
@InterfaceAudience.Private
public class EditLogFileOutputStream extends EditLogOutputStream {
private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);
private File file;
@ -96,11 +99,23 @@ public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
public void create() throws IOException {
fc.truncate(0);
fc.position(0);
doubleBuf.getCurrentBuf().writeInt(HdfsConstants.LAYOUT_VERSION);
writeHeader(doubleBuf.getCurrentBuf());
setReadyToFlush();
flush();
}
/**
* Write header information for this EditLogFileOutputStream to the provided
* DataOutputSream.
*
* @param out the output stream to write the header to.
* @throws IOException in the event of error writing to the stream.
*/
@VisibleForTesting
public static void writeHeader(DataOutputStream out) throws IOException {
out.writeInt(HdfsConstants.LAYOUT_VERSION);
}
@Override
public void close() throws IOException {
if (fp == null) {

View File

@ -605,19 +605,21 @@ static EditLogValidation validateEditLog(EditLogInputStream in) {
FSImage.LOG.debug("Caught exception after reading " + numValid +
" ops from " + in + " while determining its valid length.", t);
}
return new EditLogValidation(lastPos, firstTxId, lastTxId);
return new EditLogValidation(lastPos, firstTxId, lastTxId, false);
}
static class EditLogValidation {
private long validLength;
private long startTxId;
private long endTxId;
private final long validLength;
private final long startTxId;
private final long endTxId;
private final boolean corruptionDetected;
EditLogValidation(long validLength,
long startTxId, long endTxId) {
EditLogValidation(long validLength, long startTxId, long endTxId,
boolean corruptionDetected) {
this.validLength = validLength;
this.startTxId = startTxId;
this.endTxId = endTxId;
this.corruptionDetected = corruptionDetected;
}
long getValidLength() { return validLength; }
@ -633,6 +635,8 @@ long getNumTransactions() {
}
return (endTxId - startTxId) + 1;
}
boolean hasCorruptHeader() { return corruptionDetected; }
}
/**

View File

@ -31,7 +31,6 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
@ -61,7 +60,6 @@ class FileJournalManager implements JournalManager {
NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
private File currentInProgress = null;
private long maxSeenTransaction = 0L;
@VisibleForTesting
StoragePurger purger
@ -143,7 +141,7 @@ List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
allLogFiles.size());
for (EditLogFile elf : allLogFiles) {
if (elf.isCorrupt() || elf.isInProgress()) continue;
if (elf.hasCorruptHeader() || elf.isInProgress()) continue;
if (elf.getFirstTxId() >= firstTxId) {
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
} else if ((firstTxId > elf.getFirstTxId()) &&
@ -244,7 +242,7 @@ public long getNumberOfTransactions(long fromTxId, boolean inProgressOk)
elf.validateLog();
}
if (elf.isCorrupt()) {
if (elf.hasCorruptHeader()) {
break;
}
numTxns += elf.getLastTxId() + 1 - fromTxId;
@ -282,21 +280,37 @@ synchronized public void recoverUnfinalizedSegments() throws IOException {
LOG.info("Recovering unfinalized segments in " + currentDir);
List<EditLogFile> allLogFiles = matchEditLogs(currentDir.listFiles());
// make sure journal is aware of max seen transaction before moving corrupt
// files aside
findMaxTransaction(true);
for (EditLogFile elf : allLogFiles) {
if (elf.getFile().equals(currentInProgress)) {
continue;
}
if (elf.isInProgress()) {
elf.validateLog();
if (elf.isCorrupt()) {
elf.moveAsideCorruptFile();
// If the file is zero-length, we likely just crashed after opening the
// file, but before writing anything to it. Safe to delete it.
if (elf.getFile().length() == 0) {
LOG.info("Deleting zero-length edit log file " + elf);
elf.getFile().delete();
continue;
}
elf.validateLog();
if (elf.hasCorruptHeader()) {
elf.moveAsideCorruptFile();
throw new CorruptionException("In-progress edit log file is corrupt: "
+ elf);
}
// If the file has a valid header (isn't corrupt) but contains no
// transactions, we likely just crashed after opening the file and
// writing the header, but before syncing any transactions. Safe to
// delete the file.
if (elf.getNumTransactions() == 0) {
LOG.info("Deleting edit log file with zero transactions " + elf);
elf.getFile().delete();
continue;
}
finalizeLogSegment(elf.getFirstTxId(), elf.getLastTxId());
}
}
@ -321,15 +335,21 @@ private List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
/**
* Find the maximum transaction in the journal.
* This gets stored in a member variable, as corrupt edit logs
* will be moved aside, but we still need to remember their first
* tranaction id in the case that it was the maximum transaction in
* the journal.
*/
private long findMaxTransaction(boolean inProgressOk)
throws IOException {
boolean considerSeenTxId = true;
long seenTxId = NNStorage.readTransactionIdFile(sd);
long maxSeenTransaction = 0;
for (EditLogFile elf : getLogFiles(0)) {
if (elf.isInProgress() && !inProgressOk) {
if (elf.getFirstTxId() != HdfsConstants.INVALID_TXID &&
elf.getFirstTxId() <= seenTxId) {
// don't look at the seen_txid file if in-progress logs are not to be
// examined, and the value in seen_txid falls within the in-progress
// segment.
considerSeenTxId = false;
}
continue;
}
@ -339,8 +359,12 @@ private long findMaxTransaction(boolean inProgressOk)
}
maxSeenTransaction = Math.max(elf.getLastTxId(), maxSeenTransaction);
}
if (considerSeenTxId) {
return Math.max(maxSeenTransaction, seenTxId);
} else {
return maxSeenTransaction;
}
}
@Override
public String toString() {
@ -354,8 +378,9 @@ static class EditLogFile {
private File file;
private final long firstTxId;
private long lastTxId;
private long numTx = -1;
private boolean isCorrupt = false;
private boolean hasCorruptHeader = false;
private final boolean isInProgress;
final static Comparator<EditLogFile> COMPARE_BY_START_TXID
@ -407,11 +432,13 @@ boolean containsTxId(long txId) {
*/
void validateLog() throws IOException {
EditLogValidation val = EditLogFileInputStream.validateEditLog(file);
if (val.getNumTransactions() == 0) {
markCorrupt();
} else {
this.numTx = val.getNumTransactions();
this.lastTxId = val.getEndTxId();
this.hasCorruptHeader = val.hasCorruptHeader();
}
long getNumTransactions() {
return numTx;
}
boolean isInProgress() {
@ -422,16 +449,12 @@ File getFile() {
return file;
}
void markCorrupt() {
isCorrupt = true;
}
boolean isCorrupt() {
return isCorrupt;
boolean hasCorruptHeader() {
return hasCorruptHeader;
}
void moveAsideCorruptFile() throws IOException {
assert isCorrupt;
assert hasCorruptHeader;
File src = file;
File dst = new File(src.getParent(), src.getName() + ".corrupt");
@ -446,8 +469,9 @@ void moveAsideCorruptFile() throws IOException {
@Override
public String toString() {
return String.format("EditLogFile(file=%s,first=%019d,last=%019d,"
+"inProgress=%b,corrupt=%b)", file.toString(),
firstTxId, lastTxId, isInProgress(), isCorrupt);
+"inProgress=%b,hasCorruptHeader=%b,numTx=%d)",
file.toString(), firstTxId, lastTxId,
isInProgress(), hasCorruptHeader, numTx);
}
}
}

View File

@ -224,7 +224,7 @@ private void doTailEdits() throws IOException, InterruptedException {
editsLoaded = elie.getNumEditsLoaded();
throw elie;
} finally {
if (editsLoaded > 0) {
if (editsLoaded > 0 || LOG.isDebugEnabled()) {
LOG.info(String.format("Loaded %d edits starting from txid %d ",
editsLoaded, lastTxnId));
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -27,6 +28,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.SafeModeInfo;
@ -35,7 +37,6 @@
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.GenericTestUtils;
import org.mockito.Mockito;
/**
@ -204,4 +205,8 @@ public static boolean safeModeInitializedReplQueues(NameNode nn) {
}
return smi.initializedReplQueues;
}
public static File getInProgressEditsFile(StorageDirectory sd, long startTxId) {
return NNStorage.getInProgressEditsFile(sd, startTxId);
}
}

View File

@ -629,22 +629,26 @@ private void testCrashRecovery(int numTransactions) throws Exception {
}
}
// should succeed - only one corrupt log dir
public void testCrashRecoveryEmptyLogOneDir() throws Exception {
doTestCrashRecoveryEmptyLog(false, true);
doTestCrashRecoveryEmptyLog(false, true, true);
}
// should fail - seen_txid updated to 3, but no log dir contains txid 3
public void testCrashRecoveryEmptyLogBothDirs() throws Exception {
doTestCrashRecoveryEmptyLog(true, true);
doTestCrashRecoveryEmptyLog(true, true, false);
}
// should succeed - only one corrupt log dir
public void testCrashRecoveryEmptyLogOneDirNoUpdateSeenTxId()
throws Exception {
doTestCrashRecoveryEmptyLog(false, false);
doTestCrashRecoveryEmptyLog(false, false, true);
}
// should succeed - both log dirs corrupt, but seen_txid never updated
public void testCrashRecoveryEmptyLogBothDirsNoUpdateSeenTxId()
throws Exception {
doTestCrashRecoveryEmptyLog(true, false);
doTestCrashRecoveryEmptyLog(true, false, true);
}
/**
@ -660,12 +664,13 @@ public void testCrashRecoveryEmptyLogBothDirsNoUpdateSeenTxId()
* NN should fail to start up, because it's aware that txid 3
* was reached, but unable to find a non-corrupt log starting there.
* @param updateTransactionIdFile if true update the seen_txid file.
* If false, the it will not be updated. This will simulate a case
* where the NN crashed between creating the new segment and updating
* seen_txid.
* If false, it will not be updated. This will simulate a case where
* the NN crashed between creating the new segment and updating the
* seen_txid file.
* @param shouldSucceed true if the test is expected to succeed.
*/
private void doTestCrashRecoveryEmptyLog(boolean inBothDirs,
boolean updateTransactionIdFile)
boolean updateTransactionIdFile, boolean shouldSucceed)
throws Exception {
// start a cluster
Configuration conf = new HdfsConfiguration();
@ -684,30 +689,41 @@ private void doTestCrashRecoveryEmptyLog(boolean inBothDirs,
// Make a truncated edits_3_inprogress
File log = new File(currentDir,
NNStorage.getInProgressEditsFileName(3));
NNStorage storage = new NNStorage(conf,
Collections.<URI>emptyList(),
Lists.newArrayList(uri));
if (updateTransactionIdFile) {
storage.writeTransactionIdFileToStorage(3);
}
storage.close();
new EditLogFileOutputStream(log, 1024).create();
if (!inBothDirs) {
break;
}
NNStorage storage = new NNStorage(conf,
Collections.<URI>emptyList(),
Lists.newArrayList(uri));
if (updateTransactionIdFile) {
storage.writeTransactionIdFileToStorage(3);
}
storage.close();
}
try {
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(NUM_DATA_NODES).format(false).build();
fail("Did not fail to start with all-corrupt logs");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
"No non-corrupt logs for txid 3", ioe);
if (!shouldSucceed) {
fail("Should not have succeeded in startin cluster");
}
} catch (IOException ioe) {
if (shouldSucceed) {
LOG.info("Should have succeeded in starting cluster, but failed", ioe);
throw ioe;
} else {
GenericTestUtils.assertExceptionContains(
"No non-corrupt logs for txid 3",
ioe);
}
} finally {
cluster.shutdown();
}
}
private static class EditLogByteInputStream extends EditLogInputStream {
@ -1082,9 +1098,7 @@ public boolean accept(File dir, String name) {
editlog.initJournalsForWrite();
long startTxId = 1;
try {
Iterable<EditLogInputStream> editStreams
= editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL);
editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL);
fail("Should have thrown exception");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(

View File

@ -243,7 +243,9 @@ public void testCountValidTransactions() throws IOException {
Files.copy(logFileBak, logFile);
corruptByteInFile(logFile, offset);
EditLogValidation val = EditLogFileInputStream.validateEditLog(logFile);
assertTrue(val.getNumTransactions() >= prevNumValid);
assertTrue(String.format("%d should have been >= %d",
val.getNumTransactions(), prevNumValid),
val.getNumTransactions() >= prevNumValid);
prevNumValid = val.getNumTransactions();
}
}

View File

@ -255,7 +255,8 @@ public boolean accept(File dir, String name) {
jm.getNumberOfTransactions(startGapTxId, true);
fail("Should have thrown an exception by now");
} catch (IOException ioe) {
assertTrue(true);
GenericTestUtils.assertExceptionContains(
"Gap in transactions, max txnid is 110, 0 txns from 31", ioe);
}
// rolled 10 times so there should be 11 files.

View File

@ -17,14 +17,22 @@
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@ -33,7 +41,10 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IOUtils;
@ -41,8 +52,9 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@ -60,6 +72,10 @@ public class TestHAStateTransitions {
private static final String TEST_FILE_DATA =
"Hello state transitioning world";
static {
((Log4JLogger)EditLogTailer.LOG).getLogger().setLevel(Level.ALL);
}
/**
* Test which takes a single node and flip flops between
* active and standby mode, making sure it doesn't
@ -354,4 +370,55 @@ public void testManualFailoverFailbackFederationHA() throws Exception {
cluster.shutdown();
}
}
@Test
public void testFailoverWithEmptyInProgressEditLog() throws Exception {
testFailoverAfterCrashDuringLogRoll(false);
}
@Test
public void testFailoverWithEmptyInProgressEditLogWithHeader()
throws Exception {
testFailoverAfterCrashDuringLogRoll(true);
}
private static void testFailoverAfterCrashDuringLogRoll(boolean writeHeader)
throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, Integer.MAX_VALUE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.build();
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
try {
cluster.transitionToActive(0);
NameNode nn0 = cluster.getNameNode(0);
nn0.getRpcServer().rollEditLog();
cluster.shutdownNameNode(0);
createEmptyInProgressEditLog(cluster, nn0, writeHeader);
cluster.transitionToActive(1);
} finally {
IOUtils.cleanup(LOG, fs);
cluster.shutdown();
}
}
private static void createEmptyInProgressEditLog(MiniDFSCluster cluster,
NameNode nn, boolean writeHeader) throws IOException {
long txid = nn.getNamesystem().getEditLog().getLastWrittenTxId();
URI sharedEditsUri = cluster.getSharedEditsDir(0, 1);
File sharedEditsDir = new File(sharedEditsUri.getPath());
StorageDirectory storageDir = new StorageDirectory(sharedEditsDir);
File inProgressFile = NameNodeAdapter.getInProgressEditsFile(storageDir,
txid + 1);
assertTrue("Failed to create in-progress edits file",
inProgressFile.createNewFile());
if (writeHeader) {
DataOutputStream out = new DataOutputStream(new FileOutputStream(
inProgressFile));
EditLogFileOutputStream.writeHeader(out);
}
}
}

View File

@ -80,8 +80,8 @@ public static void assertGlobEquals(File dir, String pattern,
public static void assertExceptionContains(String string, Throwable t) {
String msg = t.getMessage();
Assert.assertTrue(
"Unexpected exception:" + StringUtils.stringifyException(t),
msg.contains(string));
"Expected to find '" + string + "' but got unexpected exception:"
+ StringUtils.stringifyException(t), msg.contains(string));
}
public static void waitFor(Supplier<Boolean> check,