HDFS-3335. check for edit log corruption at the end of the log. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1338492 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-05-15 00:41:37 +00:00
parent 2a406d0437
commit 95710c15b7
10 changed files with 341 additions and 48 deletions

View File

@ -459,6 +459,9 @@ Release 2.0.0 - UNRELEASED
HDFS-3404. Make putImage in GetImageServlet infer remote address to fetch
from request. (atm)
HDFS-3335. check for edit log corruption at the end of the log
(Colin Patrick McCabe via todd)
OPTIMIZATIONS
HDFS-3024. Improve performance of stringification in addStoredBlock (todd)

View File

@ -106,7 +106,35 @@ public String getName() {
@Override
protected FSEditLogOp nextOp() throws IOException {
return reader.readOp(false);
FSEditLogOp op = reader.readOp(false);
if ((op != null) && (op.hasTransactionId())) {
long txId = op.getTransactionId();
if ((txId >= lastTxId) &&
(lastTxId != HdfsConstants.INVALID_TXID)) {
//
// Sometimes, the NameNode crashes while it's writing to the
// edit log. In that case, you can end up with an unfinalized edit log
// which has some garbage at the end.
// JournalManager#recoverUnfinalizedSegments will finalize these
// unfinished edit logs, giving them a defined final transaction
// ID. Then they will be renamed, so that any subsequent
// readers will have this information.
//
// Since there may be garbage at the end of these "cleaned up"
// logs, we want to be sure to skip it here if we've read everything
// we were supposed to read out of the stream.
// So we force an EOF on all subsequent reads.
//
long skipAmt = file.length() - tracker.getPos();
if (skipAmt > 0) {
FSImage.LOG.warn("skipping " + skipAmt + " bytes at the end " +
"of edit log '" + getName() + "': reached txid " + txId +
" out of " + lastTxId);
tracker.skip(skipAmt);
}
}
}
return op;
}
@Override

View File

@ -41,12 +41,13 @@
@InterfaceAudience.Private
public class EditLogFileOutputStream extends EditLogOutputStream {
private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);
public static final int PREALLOCATION_LENGTH = 1024 * 1024;
private File file;
private FileOutputStream fp; // file stream for storing edit logs
private FileChannel fc; // channel of the file stream for sync
private EditsDoubleBuffer doubleBuf;
static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB
static ByteBuffer fill = ByteBuffer.allocateDirect(PREALLOCATION_LENGTH);
static {
fill.position(0);

View File

@ -144,7 +144,7 @@ long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit,
check203UpgradeFailure(logVersion, e);
String errorMessage =
formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId);
FSImage.LOG.error(errorMessage);
FSImage.LOG.error(errorMessage, e);
if (recovery == null) {
// We will only try to skip over problematic opcodes when in
// recovery mode.
@ -730,29 +730,34 @@ public PositionTrackingInputStream(InputStream is) {
super(is);
}
@Override
public int read() throws IOException {
int ret = super.read();
if (ret != -1) curPos++;
return ret;
}
@Override
public int read(byte[] data) throws IOException {
int ret = super.read(data);
if (ret > 0) curPos += ret;
return ret;
}
@Override
public int read(byte[] data, int offset, int length) throws IOException {
int ret = super.read(data, offset, length);
if (ret > 0) curPos += ret;
return ret;
}
@Override
public void mark(int limit) {
super.mark(limit);
markPos = curPos;
}
@Override
public void reset() throws IOException {
if (markPos == -1) {
throw new IOException("Not marked!");
@ -765,6 +770,13 @@ public void reset() throws IOException {
public long getPos() {
return curPos;
}
@Override
public long skip(long amt) throws IOException {
long ret = super.skip(amt);
curPos += ret;
return ret;
}
}
public long getLastAppliedTxId() {

View File

@ -75,6 +75,7 @@
public abstract class FSEditLogOp {
public final FSEditLogOpCodes opCode;
long txid;
private static final int MAX_OP_SIZE = 100 * 1024 * 1024;
@SuppressWarnings("deprecation")
@ -2263,31 +2264,76 @@ public Reader(DataInputStream in, int logVersion) {
*
* @param skipBrokenEdits If true, attempt to skip over damaged parts of
* the input stream, rather than throwing an IOException
* @return the operation read from the stream, or null at the end of the file
* @throws IOException on error.
* @return the operation read from the stream, or null at the end of the
* file
* @throws IOException on error. This function should only throw an
* exception when skipBrokenEdits is false.
*/
public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
FSEditLogOp op = null;
while (true) {
try {
in.mark(in.available());
try {
op = decodeOp();
} finally {
// If we encountered an exception or an end-of-file condition,
// do not advance the input stream.
if (op == null) {
in.reset();
}
}
return op;
} catch (IOException e) {
in.mark(MAX_OP_SIZE);
return decodeOp();
} catch (GarbageAfterTerminatorException e) {
in.reset();
if (!skipBrokenEdits) {
throw e;
}
if (in.skip(1) < 1) {
// If we saw a terminator opcode followed by a long region of 0x00 or
// 0xff, we want to skip over that region, because there's nothing
// interesting there.
long numSkip = e.getNumAfterTerminator();
if (in.skip(numSkip) < numSkip) {
FSImage.LOG.error("Failed to skip " + numSkip + " bytes of " +
"garbage after an OP_INVALID. Unexpected early EOF.");
return null;
}
} catch (IOException e) {
in.reset();
if (!skipBrokenEdits) {
throw e;
}
} catch (RuntimeException e) {
// FSEditLogOp#decodeOp is not supposed to throw RuntimeException.
// However, we handle it here for recovery mode, just to be more
// robust.
in.reset();
if (!skipBrokenEdits) {
throw e;
}
} catch (Throwable e) {
in.reset();
if (!skipBrokenEdits) {
throw new IOException("got unexpected exception " +
e.getMessage(), e);
}
}
// Move ahead one byte and re-try the decode process.
if (in.skip(1) < 1) {
return null;
}
}
}
private void verifyTerminator() throws IOException {
long off = 0;
/** The end of the edit log should contain only 0x00 or 0xff bytes.
* If it contains other bytes, the log itself may be corrupt.
* It is important to check this; if we don't, a stray OP_INVALID byte
* could make us stop reading the edit log halfway through, and we'd never
* know that we had lost data.
*/
byte[] buf = new byte[4096];
while (true) {
int numRead = in.read(buf);
if (numRead == -1) {
return;
}
for (int i = 0; i < numRead; i++, off++) {
if ((buf[i] != (byte)0) && (buf[i] != (byte)-1)) {
throw new GarbageAfterTerminatorException("Read garbage after " +
"the terminator!", off);
}
}
}
}
@ -2306,8 +2352,10 @@ private FSEditLogOp decodeOp() throws IOException {
}
FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
if (opCode == OP_INVALID)
if (opCode == OP_INVALID) {
verifyTerminator();
return null;
}
FSEditLogOp op = cache.get(opCode);
if (op == null) {
@ -2477,4 +2525,35 @@ public static PermissionStatus permissionStatusFromXml(Stanza st)
short mode = Short.valueOf(st.getValue("MODE"));
return new PermissionStatus(username, groupname, new FsPermission(mode));
}
}
/**
* Exception indicating that we found an OP_INVALID followed by some
* garbage. An OP_INVALID should signify the end of the file... if there
* is additional content after that, then the edit log is corrupt.
*/
static class GarbageAfterTerminatorException extends IOException {
private static final long serialVersionUID = 1L;
private final long numAfterTerminator;
public GarbageAfterTerminatorException(String str,
long numAfterTerminator) {
super(str);
this.numAfterTerminator = numAfterTerminator;
}
/**
* Get the number of bytes after the terminator at which the garbage
* appeared.
*
* So if you had an OP_INVALID followed immediately by another valid opcode,
* this would be 0.
* If you had an OP_INVALID followed by some padding bytes, followed by a
* stray byte at the end, this would be the number of padding bytes.
*
* @return numAfterTerminator
*/
public long getNumAfterTerminator() {
return numAfterTerminator;
}
}
}

View File

@ -60,6 +60,7 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@ -536,6 +537,11 @@ public FSEditLog getEditLog() {
return editLog;
}
@VisibleForTesting
void setEditLogForTesting(FSEditLog newLog) {
editLog = newLog;
}
void openEditLogForWrite() throws IOException {
assert editLog != null : "editLog must be initialized";
editLog.openForWrite();

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
@ -48,7 +49,8 @@ static OfflineEditsLoader createLoader(OfflineEditsVisitor visitor,
OfflineEditsLoader loader = null;
try {
file = new File(inputFileName);
elis = new EditLogFileInputStream(file, -1, -1, false);
elis = new EditLogFileInputStream(file, HdfsConstants.INVALID_TXID,
HdfsConstants.INVALID_TXID, false);
loader = new OfflineEditsBinaryLoader(visitor, elis);
} finally {
if ((loader == null) && (elis != null)) {

View File

@ -527,7 +527,7 @@ public void testEditChecksum() throws Exception {
} catch (IOException e) {
// expected
assertEquals("Cause of exception should be ChecksumException",
e.getCause().getClass(), ChecksumException.class);
ChecksumException.class, e.getCause().getClass());
}
}

View File

@ -68,7 +68,7 @@ public void testPreallocation() throws IOException {
assertEquals(1, validation.getNumTransactions());
assertEquals("Edit log should have 1MB pre-allocated, plus 4 bytes " +
"for the version number",
PREALLOCATION_LENGTH, editLog.length());
EditLogFileOutputStream.PREALLOCATION_LENGTH + 4, editLog.length());
cluster.getFileSystem().mkdirs(new Path("/tmp"),
@ -82,7 +82,7 @@ public void testPreallocation() throws IOException {
assertEquals(2, validation.getNumTransactions());
assertEquals("Edit log should be 1MB long, plus 4 bytes for the version number",
PREALLOCATION_LENGTH, editLog.length());
EditLogFileOutputStream.PREALLOCATION_LENGTH + 4, editLog.length());
// 256 blocks for the 1MB of preallocation space
assertTrue("Edit log disk space used should be at least 257 blocks",
256 * 4096 <= new DU(editLog, conf).getUsed());

View File

@ -25,6 +25,8 @@
import java.util.Set;
import static org.junit.Assert.*;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -37,7 +39,6 @@
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteOp;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
@ -213,15 +214,129 @@ public Set<Long> getValidTxIds() {
public void testSkipEdit() throws IOException {
runEditLogTest(new EltsTestGarbageInEditLog());
}
/** Test that we can successfully recover from a situation where the last
* entry in the edit log has been truncated. */
@Test(timeout=180000)
public void testRecoverTruncatedEditLog() throws IOException {
/**
* An algorithm for corrupting an edit log.
*/
static interface Corruptor {
/*
* Corrupt an edit log file.
*
* @param editFile The edit log file
*/
public void corrupt(File editFile) throws IOException;
/*
* Explain whether we need to read the log in recovery mode
*
* @param finalized True if the edit log in question is finalized.
* We're a little more lax about reading unfinalized
* logs. We will allow a small amount of garbage at
* the end. In a finalized log, every byte must be
* perfect.
*
* @return Whether we need to read the log in recovery mode
*/
public boolean needRecovery(boolean finalized);
/*
* Get the name of this corruptor
*
* @return The Corruptor name
*/
public String getName();
}
static class TruncatingCorruptor implements Corruptor {
@Override
public void corrupt(File editFile) throws IOException {
// Corrupt the last edit
long fileLen = editFile.length();
RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
rwf.setLength(fileLen - 1);
rwf.close();
}
@Override
public boolean needRecovery(boolean finalized) {
return finalized;
}
@Override
public String getName() {
return "truncated";
}
}
static class PaddingCorruptor implements Corruptor {
@Override
public void corrupt(File editFile) throws IOException {
// Add junk to the end of the file
RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
rwf.seek(editFile.length());
for (int i = 0; i < 129; i++) {
rwf.write((byte)0);
}
rwf.write(0xd);
rwf.write(0xe);
rwf.write(0xa);
rwf.write(0xd);
rwf.close();
}
@Override
public boolean needRecovery(boolean finalized) {
// With finalized edit logs, we ignore what's at the end as long as we
// can make it to the correct transaction ID.
// With unfinalized edit logs, the finalization process ignores garbage
// at the end.
return false;
}
@Override
public String getName() {
return "padFatal";
}
}
static class SafePaddingCorruptor implements Corruptor {
private byte padByte;
public SafePaddingCorruptor(byte padByte) {
this.padByte = padByte;
assert ((this.padByte == 0) || (this.padByte == -1));
}
@Override
public void corrupt(File editFile) throws IOException {
// Add junk to the end of the file
RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
rwf.seek(editFile.length());
rwf.write((byte)-1);
for (int i = 0; i < 1024; i++) {
rwf.write(padByte);
}
rwf.close();
}
@Override
public boolean needRecovery(boolean finalized) {
return false;
}
@Override
public String getName() {
return "pad" + ((int)padByte);
}
}
static void testNameNodeRecoveryImpl(Corruptor corruptor, boolean finalize)
throws IOException {
final String TEST_PATH = "/test/path/dir";
final int NUM_TEST_MKDIRS = 10;
// start a cluster
final boolean needRecovery = corruptor.needRecovery(finalize);
// start a cluster
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
FileSystem fileSys = null;
@ -230,6 +345,15 @@ public void testRecoverTruncatedEditLog() throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.build();
cluster.waitActive();
if (!finalize) {
// Normally, the in-progress edit log would be finalized by
// FSEditLog#endCurrentLogSegment. For testing purposes, we
// disable that here.
FSEditLog spyLog =
spy(cluster.getNameNode().getFSImage().getEditLog());
doNothing().when(spyLog).endCurrentLogSegment(true);
cluster.getNameNode().getFSImage().setEditLogForTesting(spyLog);
}
fileSys = cluster.getFileSystem();
final FSNamesystem namesystem = cluster.getNamesystem();
FSImage fsimage = namesystem.getFSImage();
@ -246,13 +370,11 @@ public void testRecoverTruncatedEditLog() throws IOException {
File editFile = FSImageTestUtil.findLatestEditsLog(sd).getFile();
assertTrue("Should exist: " + editFile, editFile.exists());
// Corrupt the last edit
long fileLen = editFile.length();
RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
rwf.setLength(fileLen - 1);
rwf.close();
// Make sure that we can't start the cluster normally before recovery
// Corrupt the edit log
corruptor.corrupt(editFile);
// If needRecovery == true, make sure that we can't start the
// cluster normally before recovery
cluster = null;
try {
LOG.debug("trying to start normally (this should fail)...");
@ -260,16 +382,24 @@ public void testRecoverTruncatedEditLog() throws IOException {
.format(false).build();
cluster.waitActive();
cluster.shutdown();
fail("expected the truncated edit log to prevent normal startup");
if (needRecovery) {
fail("expected the corrupted edit log to prevent normal startup");
}
} catch (IOException e) {
// success
if (!needRecovery) {
LOG.error("Got unexpected failure with " + corruptor.getName() +
corruptor, e);
fail("got unexpected exception " + e.getMessage());
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
// Perform recovery
// Perform NameNode recovery.
// Even if there was nothing wrong previously (needRecovery == false),
// this should still work fine.
cluster = null;
try {
LOG.debug("running recovery...");
@ -277,22 +407,22 @@ public void testRecoverTruncatedEditLog() throws IOException {
.format(false).startupOption(recoverStartOpt).build();
} catch (IOException e) {
fail("caught IOException while trying to recover. " +
"message was " + e.getMessage() +
"message was " + e.getMessage() +
"\nstack trace\n" + StringUtils.stringifyException(e));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
// Make sure that we can start the cluster normally after recovery
cluster = null;
try {
LOG.debug("starting cluster normally after recovery...");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false).build();
LOG.debug("testRecoverTruncatedEditLog: successfully recovered the " +
"truncated edit log");
LOG.debug("successfully recovered the " + corruptor.getName() +
" corrupted edit log");
assertTrue(cluster.getFileSystem().exists(new Path(TEST_PATH)));
} catch (IOException e) {
fail("failed to recover. Error message: " + e.getMessage());
@ -302,4 +432,36 @@ public void testRecoverTruncatedEditLog() throws IOException {
}
}
}
/** Test that we can successfully recover from a situation where the last
* entry in the edit log has been truncated. */
@Test(timeout=180000)
public void testRecoverTruncatedEditLog() throws IOException {
testNameNodeRecoveryImpl(new TruncatingCorruptor(), true);
testNameNodeRecoveryImpl(new TruncatingCorruptor(), false);
}
/** Test that we can successfully recover from a situation where the last
* entry in the edit log has been padded with garbage. */
@Test(timeout=180000)
public void testRecoverPaddedEditLog() throws IOException {
testNameNodeRecoveryImpl(new PaddingCorruptor(), true);
testNameNodeRecoveryImpl(new PaddingCorruptor(), false);
}
/** Test that don't need to recover from a situation where the last
* entry in the edit log has been padded with 0. */
@Test(timeout=180000)
public void testRecoverZeroPaddedEditLog() throws IOException {
testNameNodeRecoveryImpl(new SafePaddingCorruptor((byte)0), true);
testNameNodeRecoveryImpl(new SafePaddingCorruptor((byte)0), false);
}
/** Test that don't need to recover from a situation where the last
* entry in the edit log has been padded with 0xff bytes. */
@Test(timeout=180000)
public void testRecoverNegativeOnePaddedEditLog() throws IOException {
testNameNodeRecoveryImpl(new SafePaddingCorruptor((byte)-1), true);
testNameNodeRecoveryImpl(new SafePaddingCorruptor((byte)-1), false);
}
}