HDFS-3277. fail over to loading a different FSImage if the first one we try to load is corrupt. Contributed by Colin Patrick McCabe and Andrew Wang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1456578 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2013-03-14 18:04:04 +00:00
parent 1b6f0582fb
commit bcabbcdf4c
12 changed files with 247 additions and 111 deletions

View File

@ -414,6 +414,9 @@ Release 2.0.5-beta - UNRELEASED
HDFS-4591. HA clients can fail to fail over while Standby NN is performing
long checkpoint. (atm)
HDFS-3277. fail over to loading a different FSImage if the first one we
try to load is corrupt. (Colin Patrick McCabe and Andrew Wang via atm)
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
@ -584,11 +585,11 @@ void reloadFromImageFile(File file, FSNamesystem target) throws IOException {
boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery)
throws IOException {
FSImageStorageInspector inspector = storage.readAndInspectDirs();
FSImageFile imageFile = null;
isUpgradeFinalized = inspector.isUpgradeFinalized();
FSImageStorageInspector.FSImageFile imageFile
= inspector.getLatestImage();
List<FSImageFile> imageFiles = inspector.getLatestImages();
boolean needToSave = inspector.needToSave();
Iterable<EditLogInputStream> editStreams = null;
@ -601,7 +602,8 @@ boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery)
// we better be able to load all the edits. If we're the standby NN, it's
// OK to not be able to read all of edits right now.
long toAtLeastTxId = editLog.isOpenForWrite() ? inspector.getMaxSeenTxId() : 0;
editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1,
editStreams = editLog.selectInputStreams(
imageFiles.get(0).getCheckpointTxId() + 1,
toAtLeastTxId, recovery, false);
} else {
editStreams = FSImagePreTransactionalStorageInspector
@ -614,7 +616,6 @@ boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery)
elis.setMaxOpSize(maxOpSize);
}
LOG.debug("Planning to load image :\n" + imageFile);
for (EditLogInputStream l : editStreams) {
LOG.debug("Planning to load edit log stream: " + l);
}
@ -622,7 +623,32 @@ boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery)
LOG.info("No edit log streams selected.");
}
for (int i = 0; i < imageFiles.size(); i++) {
try {
imageFile = imageFiles.get(i);
loadFSImageFile(target, recovery, imageFile);
break;
} catch (IOException ioe) {
LOG.error("Failed to load image from " + imageFile, ioe);
target.clear();
imageFile = null;
}
}
// Failed to load any images, error out
if (imageFile == null) {
FSEditLog.closeAllStreams(editStreams);
throw new IOException("Failed to load an FSImage file!");
}
long txnsAdvanced = loadEdits(editStreams, target, recovery);
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
txnsAdvanced);
editLog.setNextTxId(lastAppliedTxId + 1);
return needToSave;
}
void loadFSImageFile(FSNamesystem target, MetaRecoveryContext recovery,
FSImageFile imageFile) throws IOException {
LOG.debug("Planning to load image :\n" + imageFile);
StorageDirectory sdForProperties = imageFile.sd;
storage.readProperties(sdForProperties);
@ -647,15 +673,6 @@ boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery)
// We don't have any record of the md5sum
loadFSImage(imageFile.getFile(), null, target, recovery);
}
} catch (IOException ioe) {
FSEditLog.closeAllStreams(editStreams);
throw new IOException("Failed to load image from " + imageFile, ioe);
}
long txnsAdvanced = loadEdits(editStreams, target, recovery);
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
txnsAdvanced);
editLog.setNextTxId(lastAppliedTxId + 1);
return needToSave;
}
public void initEditLog() {

View File

@ -232,8 +232,8 @@ void load(File curFile)
loadSecretManagerState(in);
// make sure to read to the end of file
int eof = in.read();
assert eof == -1 : "Should have reached the end of image file " + curFile;
boolean eof = (in.read() == -1);
assert eof : "Should have reached the end of image file " + curFile;
} finally {
in.close();
}

View File

@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
@ -146,7 +147,7 @@ boolean isUpgradeFinalized() {
}
@Override
FSImageFile getLatestImage() throws IOException {
List<FSImageFile> getLatestImages() throws IOException {
// We should have at least one image and one edits dirs
if (latestNameSD == null)
throw new IOException("Image file is not found in " + imageDirs);
@ -176,9 +177,12 @@ FSImageFile getLatestImage() throws IOException {
needToSaveAfterRecovery = doRecovery();
return new FSImageFile(latestNameSD,
FSImageFile file = new FSImageFile(latestNameSD,
NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE),
HdfsConstants.INVALID_TXID);
LinkedList<FSImageFile> ret = new LinkedList<FSImageFile>();
ret.add(file);
return ret;
}
@Override

View File

@ -19,6 +19,8 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -45,7 +47,7 @@ abstract class FSImageStorageInspector {
* Get the image files which should be loaded into the filesystem.
* @throws IOException if not enough files are available (eg no image found in any directory)
*/
abstract FSImageFile getLatestImage() throws IOException;
abstract List<FSImageFile> getLatestImages() throws IOException;
/**
* Get the minimum tx id which should be loaded with this set of images.

View File

@ -22,6 +22,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -108,24 +109,31 @@ public boolean isUpgradeFinalized() {
}
/**
* @return the image that has the most recent associated transaction ID.
* If there are multiple storage directories which contain equal images
* the storage directory that was inspected first will be preferred.
* @return the image files that have the most recent associated
* transaction IDs. If there are multiple storage directories which
* contain equal images, we'll return them all.
*
* @throws FileNotFoundException if not images are found.
*/
@Override
FSImageFile getLatestImage() throws IOException {
if (foundImages.isEmpty()) {
throw new FileNotFoundException("No valid image files found");
}
FSImageFile ret = null;
List<FSImageFile> getLatestImages() throws IOException {
LinkedList<FSImageFile> ret = new LinkedList<FSImageFile>();
for (FSImageFile img : foundImages) {
if (ret == null || img.txId > ret.txId) {
ret = img;
if (ret.isEmpty()) {
ret.add(img);
} else {
FSImageFile cur = ret.getFirst();
if (cur.txId == img.txId) {
ret.add(img);
} else if (cur.txId < img.txId) {
ret.clear();
ret.add(img);
}
}
}
if (ret.isEmpty()) {
throw new FileNotFoundException("No valid image files found");
}
return ret;
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.log4j.spi.ThrowableInformation;
/**
* Used to verify that certain exceptions or messages are present in log output.
*/
public class LogVerificationAppender extends AppenderSkeleton {
private final List<LoggingEvent> log = new ArrayList<LoggingEvent>();
@Override
public boolean requiresLayout() {
return false;
}
@Override
protected void append(final LoggingEvent loggingEvent) {
log.add(loggingEvent);
}
@Override
public void close() {
}
public List<LoggingEvent> getLog() {
return new ArrayList<LoggingEvent>(log);
}
public int countExceptionsWithMessage(final String text) {
int count = 0;
for (LoggingEvent e: getLog()) {
ThrowableInformation t = e.getThrowableInformation();
if (t != null) {
String m = t.getThrowable().getMessage();
if (m.contains(text)) {
count++;
}
}
}
return count;
}
}

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Logger;
import org.junit.Test;
/**
@ -293,6 +294,11 @@ public void testUpgradeFromCorruptRel22Image() throws IOException {
new File(baseDir, "name2/current/VERSION"),
"imageMD5Digest", "22222222222222222222222222222222");
// Attach our own log appender so we can verify output
final LogVerificationAppender appender = new LogVerificationAppender();
final Logger logger = Logger.getRootLogger();
logger.addAppender(appender);
// Upgrade should now fail
try {
upgradeAndVerify(new MiniDFSCluster.Builder(upgradeConf).
@ -300,9 +306,12 @@ public void testUpgradeFromCorruptRel22Image() throws IOException {
fail("Upgrade did not fail with bad MD5");
} catch (IOException ioe) {
String msg = StringUtils.stringifyException(ioe);
if (!msg.contains("is corrupt with MD5 checksum")) {
if (!msg.contains("Failed to load an FSImage file")) {
throw ioe;
}
int md5failures = appender.countExceptionsWithMessage(
" is corrupt with MD5 checksum of ");
assertEquals("Upgrade did not fail with bad MD5", 1, md5failures);
}
}

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -45,7 +46,6 @@
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.Time;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
@ -419,7 +419,7 @@ public void testChooseTargetWithMoreThanAvailableNodes() throws Exception {
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
}
final TestAppender appender = new TestAppender();
final LogVerificationAppender appender = new LogVerificationAppender();
final Logger logger = Logger.getRootLogger();
logger.addAppender(appender);
@ -447,28 +447,6 @@ public void testChooseTargetWithMoreThanAvailableNodes() throws Exception {
}
}
class TestAppender extends AppenderSkeleton {
private final List<LoggingEvent> log = new ArrayList<LoggingEvent>();
@Override
public boolean requiresLayout() {
return false;
}
@Override
protected void append(final LoggingEvent loggingEvent) {
log.add(loggingEvent);
}
@Override
public void close() {
}
public List<LoggingEvent> getLog() {
return new ArrayList<LoggingEvent>(log);
}
}
private boolean containsWithinRange(DatanodeDescriptor target,
DatanodeDescriptor[] nodes, int startIndex, int endIndex) {
assert startIndex >= 0 && startIndex < nodes.length;

View File

@ -272,15 +272,15 @@ public static void assertSameNewestImage(List<File> dirs) throws Exception {
for (File dir : dirs) {
FSImageTransactionalStorageInspector inspector =
inspectStorageDirectory(dir, NameNodeDirType.IMAGE);
FSImageFile latestImage = inspector.getLatestImage();
assertNotNull("No image in " + dir, latestImage);
long thisTxId = latestImage.getCheckpointTxId();
List<FSImageFile> latestImages = inspector.getLatestImages();
assert(!latestImages.isEmpty());
long thisTxId = latestImages.get(0).getCheckpointTxId();
if (imageTxId != -1 && thisTxId != imageTxId) {
fail("Storage directory " + dir + " does not have the same " +
"last image index " + imageTxId + " as another");
}
imageTxId = thisTxId;
imageFiles.add(inspector.getLatestImage().getFile());
imageFiles.add(inspector.getLatestImages().get(0).getFile());
}
assertFileContentsSame(imageFiles.toArray(new File[0]));
@ -424,7 +424,7 @@ public static File findLatestImageFile(StorageDirectory sd)
new FSImageTransactionalStorageInspector();
inspector.inspectDirectory(sd);
return inspector.getLatestImage().getFile();
return inspector.getLatestImages().get(0).getFile();
}
/**
@ -439,8 +439,8 @@ public static File findNewestImageFile(String currentDirPath) throws IOException
new FSImageTransactionalStorageInspector();
inspector.inspectDirectory(sd);
FSImageFile latestImage = inspector.getLatestImage();
return (latestImage == null) ? null : latestImage.getFile();
List<FSImageFile> latestImages = inspector.getLatestImages();
return (latestImages.isEmpty()) ? null : latestImages.get(0).getFile();
}
/**

View File

@ -57,7 +57,7 @@ public void testCurrentStorageInspector() throws IOException {
inspector.inspectDirectory(mockDir);
assertEquals(2, inspector.foundImages.size());
FSImageFile latestImage = inspector.getLatestImage();
FSImageFile latestImage = inspector.getLatestImages().get(0);
assertEquals(456, latestImage.txId);
assertSame(mockDir, latestImage.sd);
assertTrue(inspector.isUpgradeFinalized());

View File

@ -31,12 +31,10 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@ -46,17 +44,21 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -112,10 +114,11 @@ public void tearDown() throws Exception {
}
/**
* start MiniDFScluster, create a file (to create edits) and do a checkpoint
* Create a number of fsimage checkpoints
* @param count number of checkpoints to create
* @throws IOException
*/
public void createCheckPoint() throws IOException {
public void createCheckPoint(int count) throws IOException {
LOG.info("--starting mini cluster");
// manage dirs parameter set to false
MiniDFSCluster cluster = null;
@ -133,15 +136,18 @@ public void createCheckPoint() throws IOException {
sn = new SecondaryNameNode(config);
assertNotNull(sn);
// Create count new files and checkpoints
for (int i=0; i<count; i++) {
// create a file
FileSystem fileSys = cluster.getFileSystem();
Path file1 = new Path("t1");
DFSTestUtil.createFile(fileSys, file1, fileSize, fileSize, blockSize,
(short) 1, seed);
Path p = new Path("t" + i);
DFSTestUtil.createFile(fileSys, p, fileSize, fileSize,
blockSize, (short) 1, seed);
LOG.info("--file " + p.toString() + " created");
LOG.info("--doing checkpoint");
sn.doCheckpoint(); // this shouldn't fail
LOG.info("--done checkpoint");
}
} catch (IOException e) {
fail(StringUtils.stringifyException(e));
System.err.println("checkpoint failed");
@ -151,7 +157,36 @@ public void createCheckPoint() throws IOException {
sn.shutdown();
if(cluster!=null)
cluster.shutdown();
LOG.info("--file t1 created, cluster shutdown");
LOG.info("--cluster shutdown");
}
}
/**
* Corrupts the MD5 sum of the fsimage.
*
* @param corruptAll
* whether to corrupt one or all of the MD5 sums in the configured
* namedirs
* @throws IOException
*/
private void corruptFSImageMD5(boolean corruptAll) throws IOException {
List<URI> nameDirs = (List<URI>)FSNamesystem.getNamespaceDirs(config);
// Corrupt the md5 files in all the namedirs
for (URI uri: nameDirs) {
// Directory layout looks like:
// test/data/dfs/nameN/current/{fsimage,edits,...}
File nameDir = new File(uri.getPath());
File dfsDir = nameDir.getParentFile();
assertEquals(dfsDir.getName(), "dfs"); // make sure we got right dir
// Set the md5 file to all zeros
File imageFile = new File(nameDir,
Storage.STORAGE_DIR_CURRENT + "/"
+ NNStorage.getImageFileName(0));
MD5FileUtils.saveMD5File(imageFile, new MD5Hash(new byte[16]));
// Only need to corrupt one if !corruptAll
if (!corruptAll) {
break;
}
}
}
@ -165,7 +200,7 @@ private void corruptNameNodeFiles() throws IOException {
// get name dir and its length, then delete and recreate the directory
File dir = new File(nameDirs.get(0).getPath()); // has only one
this.fsimageLength = new File(new File(dir, "current"),
this.fsimageLength = new File(new File(dir, Storage.STORAGE_DIR_CURRENT),
NameNodeFile.IMAGE.getName()).length();
if(dir.exists() && !(FileUtil.fullyDelete(dir)))
@ -178,7 +213,7 @@ private void corruptNameNodeFiles() throws IOException {
dir = new File( nameEditsDirs.get(0).getPath()); //has only one
this.editsLength = new File(new File(dir, "current"),
this.editsLength = new File(new File(dir, Storage.STORAGE_DIR_CURRENT),
NameNodeFile.EDITS.getName()).length();
if(dir.exists() && !(FileUtil.fullyDelete(dir)))
@ -262,7 +297,7 @@ public void testChkpointStartup2() throws IOException{
config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
fileAsURI(new File(hdfsDir, "chkpt")).toString());
createCheckPoint();
createCheckPoint(1);
corruptNameNodeFiles();
checkNameNodeFiles();
@ -289,7 +324,7 @@ public void testChkpointStartup1() throws IOException{
config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
fileAsURI(new File(hdfsDir, "chkpt")).toString());
createCheckPoint();
createCheckPoint(1);
corruptNameNodeFiles();
checkNameNodeFiles();
}
@ -447,19 +482,17 @@ private void testImageChecksum(boolean compress) throws Exception {
FileSystem fs = cluster.getFileSystem();
fs.mkdirs(new Path("/test"));
// Directory layout looks like:
// test/data/dfs/nameN/current/{fsimage,edits,...}
File nameDir = new File(cluster.getNameDirs(0).iterator().next().getPath());
File dfsDir = nameDir.getParentFile();
assertEquals(dfsDir.getName(), "dfs"); // make sure we got right dir
LOG.info("Shutting down cluster #1");
cluster.shutdown();
cluster = null;
// Corrupt the md5 file to all 0s
File imageFile = new File(nameDir, "current/" + NNStorage.getImageFileName(0));
MD5FileUtils.saveMD5File(imageFile, new MD5Hash(new byte[16]));
// Corrupt the md5 files in all the namedirs
corruptFSImageMD5(true);
// Attach our own log appender so we can verify output
final LogVerificationAppender appender = new LogVerificationAppender();
final Logger logger = Logger.getRootLogger();
logger.addAppender(appender);
// Try to start a new cluster
LOG.info("\n===========================================\n" +
@ -471,9 +504,12 @@ private void testImageChecksum(boolean compress) throws Exception {
.build();
fail("Should not have successfully started with corrupt image");
} catch (IOException ioe) {
if (!ioe.getCause().getMessage().contains("is corrupt with MD5")) {
throw ioe;
}
GenericTestUtils.assertExceptionContains(
"Failed to load an FSImage file!", ioe);
int md5failures = appender.countExceptionsWithMessage(
" is corrupt with MD5 checksum of ");
// Two namedirs, so should have seen two failures
assertEquals(2, md5failures);
}
} finally {
if (cluster != null) {
@ -482,6 +518,21 @@ private void testImageChecksum(boolean compress) throws Exception {
}
}
@Test(timeout=30000)
public void testCorruptImageFallback() throws IOException {
// Create two checkpoints
createCheckPoint(2);
// Delete a single md5sum
corruptFSImageMD5(false);
// Should still be able to start
MiniDFSCluster cluster = new MiniDFSCluster.Builder(config)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.build();
cluster.waitActive();
}
/**
* This test tests hosts include list contains host names. After namenode
* restarts, the still alive datanodes should not have any trouble in getting