HDFS-2187. Make EditLogInputStream act like an iterator over FSEditLogOps. Contributed by Ivan Kelly and Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1153996 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-08-04 20:22:13 +00:00
parent 924ed13664
commit 53190cfa1d
12 changed files with 269 additions and 211 deletions

View File

@ -629,6 +629,9 @@ Trunk (unreleased changes)
HDFS-2199. Move blockTokenSecretManager from FSNamesystem to BlockManager. HDFS-2199. Move blockTokenSecretManager from FSNamesystem to BlockManager.
(Uma Maheswara Rao G via szetszwo) (Uma Maheswara Rao G via szetszwo)
HDFS-2187. Make EditLogInputStream act like an iterator over FSEditLogOps
(Ivan Kelly and todd via todd)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -210,14 +210,13 @@ private synchronized void applyEdits(long firstTxId, int numTxns, byte[] data)
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.debug("data:" + StringUtils.byteToHexString(data)); LOG.debug("data:" + StringUtils.byteToHexString(data));
} }
backupInputStream.setBytes(data);
FSEditLogLoader logLoader = new FSEditLogLoader(namesystem); FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
int logVersion = storage.getLayoutVersion(); int logVersion = storage.getLayoutVersion();
BufferedInputStream bin = new BufferedInputStream(backupInputStream); backupInputStream.setBytes(data, logVersion);
DataInputStream in = new DataInputStream(bin);
Checksum checksum = FSEditLog.getChecksum(); int numLoaded = logLoader.loadEditRecords(logVersion, backupInputStream,
int numLoaded = logLoader.loadEditRecords(logVersion, in, checksum, true, true, lastAppliedTxId + 1);
lastAppliedTxId + 1);
if (numLoaded != numTxns) { if (numLoaded != numTxns) {
throw new IOException("Batch of txns starting at txnid " + throw new IOException("Batch of txns starting at txnid " +
firstTxId + " was supposed to contain " + numTxns + firstTxId + " was supposed to contain " + numTxns +

View File

@ -21,6 +21,8 @@
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import com.google.common.base.Preconditions;
/** /**
* An implementation of the abstract class {@link EditLogInputStream}, * An implementation of the abstract class {@link EditLogInputStream},
* which is used to updates HDFS meta-data state on a backup node. * which is used to updates HDFS meta-data state on a backup node.
@ -33,6 +35,9 @@ class EditLogBackupInputStream extends EditLogInputStream {
String address; // sender address String address; // sender address
private ByteBufferInputStream inner; private ByteBufferInputStream inner;
private DataInputStream in; private DataInputStream in;
private FSEditLogOp.Reader reader = null;
private FSEditLogLoader.PositionTrackingInputStream tracker = null;
private int version = 0;
/** /**
* A ByteArrayInputStream, which lets modify the underlying byte array. * A ByteArrayInputStream, which lets modify the underlying byte array.
@ -60,7 +65,8 @@ int length() {
EditLogBackupInputStream(String name) throws IOException { EditLogBackupInputStream(String name) throws IOException {
address = name; address = name;
inner = new ByteBufferInputStream(); inner = new ByteBufferInputStream();
in = new DataInputStream(inner); in = null;
reader = null;
} }
@Override // JournalStream @Override // JournalStream
@ -74,18 +80,20 @@ public JournalType getType() {
} }
@Override @Override
public int available() throws IOException { public FSEditLogOp readOp() throws IOException {
return in.available(); Preconditions.checkState(reader != null,
"Must call setBytes() before readOp()");
return reader.readOp();
} }
@Override @Override
public int read() throws IOException { public int getVersion() throws IOException {
return in.read(); return this.version;
} }
@Override @Override
public int read(byte[] b, int off, int len) throws IOException { public long getPosition() {
return in.read(b, off, len); return tracker.getPos();
} }
@Override @Override
@ -99,16 +107,19 @@ long length() throws IOException {
return inner.length(); return inner.length();
} }
DataInputStream getDataInputStream() { void setBytes(byte[] newBytes, int version) throws IOException {
return in;
}
void setBytes(byte[] newBytes) throws IOException {
inner.setData(newBytes); inner.setData(newBytes);
in.reset(); tracker = new FSEditLogLoader.PositionTrackingInputStream(inner);
in = new DataInputStream(tracker);
this.version = version;
reader = new FSEditLogOp.Reader(in, version);
} }
void clear() throws IOException { void clear() throws IOException {
setBytes(null); setBytes(null, 0);
reader = null;
this.version = 0;
} }
} }

View File

@ -21,18 +21,51 @@
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.BufferedInputStream;
import java.io.EOFException;
import java.io.DataInputStream;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.io.IOUtils;
import com.google.common.annotations.VisibleForTesting;
/** /**
* An implementation of the abstract class {@link EditLogInputStream}, which * An implementation of the abstract class {@link EditLogInputStream}, which
* reads edits from a local file. * reads edits from a local file.
*/ */
class EditLogFileInputStream extends EditLogInputStream { class EditLogFileInputStream extends EditLogInputStream {
private File file; private final File file;
private FileInputStream fStream; private final FileInputStream fStream;
private final int logVersion;
EditLogFileInputStream(File name) throws IOException { private final FSEditLogOp.Reader reader;
private final FSEditLogLoader.PositionTrackingInputStream tracker;
/**
* Open an EditLogInputStream for the given file.
* @param name filename to open
* @throws LogHeaderCorruptException if the header is either missing or
* appears to be corrupt/truncated
* @throws IOException if an actual IO error occurs while reading the
* header
*/
EditLogFileInputStream(File name)
throws LogHeaderCorruptException, IOException {
file = name; file = name;
fStream = new FileInputStream(name); fStream = new FileInputStream(name);
BufferedInputStream bin = new BufferedInputStream(fStream);
tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
DataInputStream in = new DataInputStream(tracker);
try {
logVersion = readLogVersion(in);
} catch (EOFException eofe) {
throw new LogHeaderCorruptException("No header found in log");
}
reader = new FSEditLogOp.Reader(in, logVersion);
} }
@Override // JournalStream @Override // JournalStream
@ -46,18 +79,18 @@ public JournalType getType() {
} }
@Override @Override
public int available() throws IOException { public FSEditLogOp readOp() throws IOException {
return fStream.available(); return reader.readOp();
} }
@Override @Override
public int read() throws IOException { public int getVersion() throws IOException {
return fStream.read(); return logVersion;
} }
@Override @Override
public int read(byte[] b, int off, int len) throws IOException { public long getPosition() {
return fStream.read(b, off, len); return tracker.getPos();
} }
@Override @Override
@ -76,4 +109,62 @@ public String toString() {
return getName(); return getName();
} }
static FSEditLogLoader.EditLogValidation validateEditLog(File file) throws IOException {
EditLogFileInputStream in;
try {
in = new EditLogFileInputStream(file);
} catch (LogHeaderCorruptException corrupt) {
// If it's missing its header, this is equivalent to no transactions
FSImage.LOG.warn("Log at " + file + " has no valid header",
corrupt);
return new FSEditLogLoader.EditLogValidation(0, 0);
}
try {
return FSEditLogLoader.validateEditLog(in);
} finally {
IOUtils.closeStream(in);
}
}
/**
* Read the header of fsedit log
* @param in fsedit stream
* @return the edit log version number
* @throws IOException if error occurs
*/
@VisibleForTesting
static int readLogVersion(DataInputStream in)
throws IOException, LogHeaderCorruptException {
int logVersion;
try {
logVersion = in.readInt();
} catch (EOFException eofe) {
throw new LogHeaderCorruptException(
"Reached EOF when reading log header");
}
if (logVersion < FSConstants.LAYOUT_VERSION) { // future version
throw new LogHeaderCorruptException(
"Unexpected version of the file system log file: "
+ logVersion + ". Current version = "
+ FSConstants.LAYOUT_VERSION + ".");
}
assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
"Unsupported version " + logVersion;
return logVersion;
}
/**
* Exception indicating that the header of an edits log file is
* corrupted. This can be because the header is not present,
* or because the header data is invalid (eg claims to be
* over a newer version than the running NameNode)
*/
static class LogHeaderCorruptException extends IOException {
private static final long serialVersionUID = 1L;
private LogHeaderCorruptException(String msg) {
super(msg);
}
}
} }

View File

@ -17,10 +17,8 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.BufferedInputStream; import java.io.Closeable;
import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
/** /**
* A generic abstract class to support reading edits log data from * A generic abstract class to support reading edits log data from
@ -29,29 +27,41 @@
* It should stream bytes from the storage exactly as they were written * It should stream bytes from the storage exactly as they were written
* into the #{@link EditLogOutputStream}. * into the #{@link EditLogOutputStream}.
*/ */
abstract class EditLogInputStream extends InputStream abstract class EditLogInputStream implements JournalStream, Closeable {
implements JournalStream { /**
/** {@inheritDoc} */ * Close the stream.
public abstract int available() throws IOException; * @throws IOException if an error occurred while closing
*/
/** {@inheritDoc} */
public abstract int read() throws IOException;
/** {@inheritDoc} */
public abstract int read(byte[] b, int off, int len) throws IOException;
/** {@inheritDoc} */
public abstract void close() throws IOException; public abstract void close() throws IOException;
/**
* Read an operation from the stream
* @return an operation from the stream or null if at end of stream
* @throws IOException if there is an error reading from the stream
*/
public abstract FSEditLogOp readOp() throws IOException;
/**
* Get the layout version of the data in the stream.
* @return the layout version of the ops in the stream.
* @throws IOException if there is an error reading the version
*/
public abstract int getVersion() throws IOException;
/**
* Get the "position" of in the stream. This is useful for
* debugging and operational purposes.
*
* Different stream types can have a different meaning for
* what the position is. For file streams it means the byte offset
* from the start of the file.
*
* @return the position in the stream
*/
public abstract long getPosition();
/** /**
* Return the size of the current edits log. * Return the size of the current edits log.
*/ */
abstract long length() throws IOException; abstract long length() throws IOException;
/**
* Return DataInputStream based on this edit stream.
*/
DataInputStream getDataInputStream() {
return new DataInputStream(new BufferedInputStream(this));
}
} }

View File

@ -20,8 +20,6 @@
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.zip.Checksum;
import java.util.zip.CheckedOutputStream;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -44,7 +42,6 @@
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.PureJavaCrc32;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -116,18 +113,6 @@ private enum State {
private NNStorage storage; private NNStorage storage;
private static ThreadLocal<Checksum> localChecksum =
new ThreadLocal<Checksum>() {
protected Checksum initialValue() {
return new PureJavaCrc32();
}
};
/** Get a thread local checksum */
public static Checksum getChecksum() {
return localChecksum.get();
}
private static class TransactionId { private static class TransactionId {
public long txid; public long txid;

View File

@ -37,7 +37,8 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogHeader; import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream.LogHeaderCorruptException;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Reader; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Reader;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
@ -60,6 +61,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.io.IOUtils;
public class FSEditLogLoader { public class FSEditLogLoader {
private final FSNamesystem fsNamesys; private final FSNamesystem fsNamesys;
@ -84,29 +86,25 @@ int loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
} }
int loadFSEdits(EditLogInputStream edits, boolean closeOnExit, int loadFSEdits(EditLogInputStream edits, boolean closeOnExit,
long expectedStartingTxId) long expectedStartingTxId)
throws IOException { throws IOException {
BufferedInputStream bin = new BufferedInputStream(edits);
DataInputStream in = new DataInputStream(bin);
int numEdits = 0; int numEdits = 0;
int logVersion = edits.getVersion();
try { try {
LogHeader header = LogHeader.read(in); numEdits = loadEditRecords(logVersion, edits, false,
numEdits = loadEditRecords( expectedStartingTxId);
header.logVersion, in, header.checksum, false,
expectedStartingTxId);
} finally { } finally {
if(closeOnExit) if(closeOnExit) {
in.close(); edits.close();
}
} }
return numEdits; return numEdits;
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
int loadEditRecords(int logVersion, DataInputStream in, int loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit,
Checksum checksum, boolean closeOnExit,
long expectedStartingTxId) long expectedStartingTxId)
throws IOException { throws IOException {
FSDirectory fsDir = fsNamesys.dir; FSDirectory fsDir = fsNamesys.dir;
@ -123,10 +121,6 @@ int loadEditRecords(int logVersion, DataInputStream in,
fsNamesys.writeLock(); fsNamesys.writeLock();
fsDir.writeLock(); fsDir.writeLock();
// Keep track of the file offsets of the last several opcodes.
// This is handy when manually recovering corrupted edits files.
PositionTrackingInputStream tracker = new PositionTrackingInputStream(in);
in = new DataInputStream(tracker);
long recentOpcodeOffsets[] = new long[4]; long recentOpcodeOffsets[] = new long[4];
Arrays.fill(recentOpcodeOffsets, -1); Arrays.fill(recentOpcodeOffsets, -1);
@ -134,12 +128,10 @@ int loadEditRecords(int logVersion, DataInputStream in,
long txId = expectedStartingTxId - 1; long txId = expectedStartingTxId - 1;
try { try {
FSEditLogOp.Reader reader = new FSEditLogOp.Reader(in, logVersion,
checksum);
FSEditLogOp op; FSEditLogOp op;
while ((op = reader.readOp()) != null) { while ((op = in.readOp()) != null) {
recentOpcodeOffsets[numEdits % recentOpcodeOffsets.length] = recentOpcodeOffsets[numEdits % recentOpcodeOffsets.length] =
tracker.getPos(); in.getPosition();
if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) { if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
long thisTxId = op.txid; long thisTxId = op.txid;
if (thisTxId != txId + 1) { if (thisTxId != txId + 1) {
@ -421,7 +413,7 @@ int loadEditRecords(int logVersion, DataInputStream in,
// Catch Throwable because in the case of a truly corrupt edits log, any // Catch Throwable because in the case of a truly corrupt edits log, any
// sort of error might be thrown (NumberFormat, NullPointer, EOF, etc.) // sort of error might be thrown (NumberFormat, NullPointer, EOF, etc.)
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("Error replaying edit log at offset " + tracker.getPos()); sb.append("Error replaying edit log at offset " + in.getPosition());
if (recentOpcodeOffsets[0] != -1) { if (recentOpcodeOffsets[0] != -1) {
Arrays.sort(recentOpcodeOffsets); Arrays.sort(recentOpcodeOffsets);
sb.append("\nRecent opcode offsets:"); sb.append("\nRecent opcode offsets:");
@ -480,49 +472,50 @@ private void check203UpgradeFailure(int logVersion, IOException ex)
} }
} }
static EditLogValidation validateEditLog(File file) throws IOException {
EditLogFileInputStream in;
try {
in = new EditLogFileInputStream(file);
} catch (LogHeaderCorruptException corrupt) {
// If it's missing its header, this is equivalent to no transactions
FSImage.LOG.warn("Log at " + file + " has no valid header",
corrupt);
return new EditLogValidation(0, 0);
}
try {
return validateEditLog(in);
} finally {
IOUtils.closeStream(in);
}
}
/** /**
* Return the number of valid transactions in the file. If the file is * Return the number of valid transactions in the stream. If the stream is
* truncated during the header, returns a value indicating that there are * truncated during the header, returns a value indicating that there are
* 0 valid transactions. * 0 valid transactions. This reads through the stream but does not close
* @throws IOException if the file cannot be read due to an IO error (eg * it.
* @throws IOException if the stream cannot be read due to an IO error (eg
* if the log does not exist) * if the log does not exist)
*/ */
static EditLogValidation validateEditLog(File f) throws IOException { static EditLogValidation validateEditLog(EditLogInputStream in) {
FileInputStream fis = new FileInputStream(f); long numValid = 0;
long lastPos = 0;
try { try {
PositionTrackingInputStream tracker = new PositionTrackingInputStream( while (true) {
new BufferedInputStream(fis)); lastPos = in.getPosition();
DataInputStream dis = new DataInputStream(tracker); if (in.readOp() == null) {
LogHeader header; break;
try {
header = LogHeader.read(dis);
} catch (Throwable t) {
FSImage.LOG.debug("Unable to read header from " + f +
" -> no valid transactions in this file.");
return new EditLogValidation(0, 0);
}
Reader reader = new FSEditLogOp.Reader(dis, header.logVersion, header.checksum);
long numValid = 0;
long lastPos = 0;
try {
while (true) {
lastPos = tracker.getPos();
if (reader.readOp() == null) {
break;
}
numValid++;
} }
} catch (Throwable t) { numValid++;
// Catch Throwable and not just IOE, since bad edits may generate
// NumberFormatExceptions, AssertionErrors, OutOfMemoryErrors, etc.
FSImage.LOG.debug("Caught exception after reading " + numValid +
" ops from " + f + " while determining its valid length.", t);
} }
return new EditLogValidation(lastPos, numValid); } catch (Throwable t) {
} finally { // Catch Throwable and not just IOE, since bad edits may generate
fis.close(); // NumberFormatExceptions, AssertionErrors, OutOfMemoryErrors, etc.
FSImage.LOG.debug("Caught exception after reading " + numValid +
" ops from " + in + " while determining its valid length.", t);
} }
return new EditLogValidation(lastPos, numValid);
} }
static class EditLogValidation { static class EditLogValidation {
@ -536,9 +529,9 @@ static class EditLogValidation {
} }
/** /**
* Stream wrapper that keeps track of the current file position. * Stream wrapper that keeps track of the current stream position.
*/ */
private static class PositionTrackingInputStream extends FilterInputStream { static class PositionTrackingInputStream extends FilterInputStream {
private long curPos = 0; private long curPos = 0;
private long markPos = -1; private long markPos = -1;
@ -582,4 +575,5 @@ public long getPos() {
return curPos; return curPos;
} }
} }
} }

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.util.PureJavaCrc32;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
@ -1323,71 +1324,17 @@ private static long readLongWritable(DataInputStream in) throws IOException {
return longWritable.get(); return longWritable.get();
} }
} }
/**
* Class to encapsulate the header at the top of a log file.
*/
static class LogHeader {
final int logVersion;
final Checksum checksum;
public LogHeader(int logVersion, Checksum checksum) {
this.logVersion = logVersion;
this.checksum = checksum;
}
static LogHeader read(DataInputStream in) throws IOException {
int logVersion = 0;
logVersion = FSEditLogOp.LogHeader.readLogVersion(in);
Checksum checksum = null;
if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
checksum = FSEditLog.getChecksum();
}
return new LogHeader(logVersion, checksum);
}
/**
* Read the header of fsedit log
* @param in fsedit stream
* @return the edit log version number
* @throws IOException if error occurs
*/
private static int readLogVersion(DataInputStream in) throws IOException {
int logVersion = 0;
// Read log file version. Could be missing.
in.mark(4);
// If edits log is greater than 2G, available method will return negative
// numbers, so we avoid having to call available
boolean available = true;
try {
logVersion = in.readByte();
} catch (EOFException e) {
available = false;
}
if (available) {
in.reset();
logVersion = in.readInt();
if (logVersion < FSConstants.LAYOUT_VERSION) // future version
throw new IOException(
"Unexpected version of the file system log file: "
+ logVersion + ". Current version = "
+ FSConstants.LAYOUT_VERSION + ".");
}
assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
"Unsupported version " + logVersion;
return logVersion;
}
}
/** /**
* Class for writing editlog ops * Class for writing editlog ops
*/ */
public static class Writer { public static class Writer {
private final DataOutputBuffer buf; private final DataOutputBuffer buf;
private final Checksum checksum;
public Writer(DataOutputBuffer out) { public Writer(DataOutputBuffer out) {
this.buf = out; this.buf = out;
this.checksum = new PureJavaCrc32();
} }
/** /**
@ -1402,7 +1349,6 @@ public void writeOp(FSEditLogOp op) throws IOException {
buf.writeLong(op.txid); buf.writeLong(op.txid);
op.writeFields(buf); op.writeFields(buf);
int end = buf.getLength(); int end = buf.getLength();
Checksum checksum = FSEditLog.getChecksum();
checksum.reset(); checksum.reset();
checksum.update(buf.getData(), start, end-start); checksum.update(buf.getData(), start, end-start);
int sum = (int)checksum.getValue(); int sum = (int)checksum.getValue();
@ -1422,19 +1368,22 @@ public static class Reader {
* Construct the reader * Construct the reader
* @param in The stream to read from. * @param in The stream to read from.
* @param logVersion The version of the data coming from the stream. * @param logVersion The version of the data coming from the stream.
* @param checksum Checksum being used with input stream.
*/ */
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public Reader(DataInputStream in, int logVersion, public Reader(DataInputStream in, int logVersion) {
Checksum checksum) { this.logVersion = logVersion;
if (checksum != null) { if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
this.checksum = new PureJavaCrc32();
} else {
this.checksum = null;
}
if (this.checksum != null) {
this.in = new DataInputStream( this.in = new DataInputStream(
new CheckedInputStream(in, checksum)); new CheckedInputStream(in, this.checksum));
} else { } else {
this.in = in; this.in = in;
} }
this.logVersion = logVersion;
this.checksum = checksum;
} }
/** /**

View File

@ -588,7 +588,7 @@ long getLastTxId() {
EditLogValidation validateLog() throws IOException { EditLogValidation validateLog() throws IOException {
if (cachedValidation == null) { if (cachedValidation == null) {
cachedValidation = FSEditLogLoader.validateEditLog(file); cachedValidation = EditLogFileInputStream.validateEditLog(file);
} }
return cachedValidation; return cachedValidation;
} }

View File

@ -676,28 +676,44 @@ private void doTestCrashRecoveryEmptyLog(boolean inBothDirs) throws Exception {
private static class EditLogByteInputStream extends EditLogInputStream { private static class EditLogByteInputStream extends EditLogInputStream {
private InputStream input; private InputStream input;
private long len; private long len;
private int version;
private FSEditLogOp.Reader reader = null;
private FSEditLogLoader.PositionTrackingInputStream tracker = null;
public EditLogByteInputStream(byte[] data) { public EditLogByteInputStream(byte[] data) throws IOException {
len = data.length; len = data.length;
input = new ByteArrayInputStream(data); input = new ByteArrayInputStream(data);
}
public int available() throws IOException { BufferedInputStream bin = new BufferedInputStream(input);
return input.available(); DataInputStream in = new DataInputStream(bin);
} version = EditLogFileInputStream.readLogVersion(in);
tracker = new FSEditLogLoader.PositionTrackingInputStream(in);
public int read() throws IOException { in = new DataInputStream(tracker);
return input.read();
reader = new FSEditLogOp.Reader(in, version);
} }
@Override
public long length() throws IOException { public long length() throws IOException {
return len; return len;
} }
public int read(byte[] b, int off, int len) throws IOException { @Override
return input.read(b, off, len); public long getPosition() {
return tracker.getPos();
} }
@Override
public FSEditLogOp readOp() throws IOException {
return reader.readOp();
}
@Override
public int getVersion() throws IOException {
return version;
}
@Override
public void close() throws IOException { public void close() throws IOException {
input.close(); input.close();
} }

View File

@ -61,7 +61,7 @@ public void testPreallocation() throws IOException {
.getStorage().getStorageDir(0); .getStorage().getStorageDir(0);
File editLog = NNStorage.getInProgressEditsFile(sd, 1); File editLog = NNStorage.getInProgressEditsFile(sd, 1);
EditLogValidation validation = FSEditLogLoader.validateEditLog(editLog); EditLogValidation validation = EditLogFileInputStream.validateEditLog(editLog);
assertEquals("Edit log should contain a header as valid length", assertEquals("Edit log should contain a header as valid length",
HEADER_LEN, validation.validLength); HEADER_LEN, validation.validLength);
assertEquals(1, validation.numTransactions); assertEquals(1, validation.numTransactions);
@ -73,7 +73,7 @@ public void testPreallocation() throws IOException {
new FsPermission((short)777)); new FsPermission((short)777));
long oldLength = validation.validLength; long oldLength = validation.validLength;
validation = FSEditLogLoader.validateEditLog(editLog); validation = EditLogFileInputStream.validateEditLog(editLog);
assertTrue("Edit log should have more valid data after writing a txn " + assertTrue("Edit log should have more valid data after writing a txn " +
"(was: " + oldLength + " now: " + validation.validLength + ")", "(was: " + oldLength + " now: " + validation.validLength + ")",
validation.validLength > oldLength); validation.validLength > oldLength);

View File

@ -186,7 +186,7 @@ public void testCountValidTransactions() throws IOException {
// Make sure that uncorrupted log has the expected length and number // Make sure that uncorrupted log has the expected length and number
// of transactions. // of transactions.
EditLogValidation validation = FSEditLogLoader.validateEditLog(logFile); EditLogValidation validation = EditLogFileInputStream.validateEditLog(logFile);
assertEquals(NUM_TXNS + 2, validation.numTransactions); assertEquals(NUM_TXNS + 2, validation.numTransactions);
assertEquals(validLength, validation.validLength); assertEquals(validLength, validation.validLength);
@ -202,7 +202,7 @@ public void testCountValidTransactions() throws IOException {
// Restore backup, truncate the file exactly before the txn // Restore backup, truncate the file exactly before the txn
Files.copy(logFileBak, logFile); Files.copy(logFileBak, logFile);
truncateFile(logFile, txOffset); truncateFile(logFile, txOffset);
validation = FSEditLogLoader.validateEditLog(logFile); validation = EditLogFileInputStream.validateEditLog(logFile);
assertEquals("Failed when truncating to length " + txOffset, assertEquals("Failed when truncating to length " + txOffset,
txid - 1, validation.numTransactions); txid - 1, validation.numTransactions);
assertEquals(txOffset, validation.validLength); assertEquals(txOffset, validation.validLength);
@ -211,7 +211,7 @@ public void testCountValidTransactions() throws IOException {
// also isn't valid // also isn't valid
Files.copy(logFileBak, logFile); Files.copy(logFileBak, logFile);
truncateFile(logFile, txOffset + 1); truncateFile(logFile, txOffset + 1);
validation = FSEditLogLoader.validateEditLog(logFile); validation = EditLogFileInputStream.validateEditLog(logFile);
assertEquals("Failed when truncating to length " + (txOffset + 1), assertEquals("Failed when truncating to length " + (txOffset + 1),
txid - 1, validation.numTransactions); txid - 1, validation.numTransactions);
assertEquals(txOffset, validation.validLength); assertEquals(txOffset, validation.validLength);
@ -219,7 +219,7 @@ public void testCountValidTransactions() throws IOException {
// Restore backup, corrupt the txn opcode // Restore backup, corrupt the txn opcode
Files.copy(logFileBak, logFile); Files.copy(logFileBak, logFile);
corruptByteInFile(logFile, txOffset); corruptByteInFile(logFile, txOffset);
validation = FSEditLogLoader.validateEditLog(logFile); validation = EditLogFileInputStream.validateEditLog(logFile);
assertEquals("Failed when corrupting txn opcode at " + txOffset, assertEquals("Failed when corrupting txn opcode at " + txOffset,
txid - 1, validation.numTransactions); txid - 1, validation.numTransactions);
assertEquals(txOffset, validation.validLength); assertEquals(txOffset, validation.validLength);
@ -227,7 +227,7 @@ public void testCountValidTransactions() throws IOException {
// Restore backup, corrupt a byte a few bytes into the txn // Restore backup, corrupt a byte a few bytes into the txn
Files.copy(logFileBak, logFile); Files.copy(logFileBak, logFile);
corruptByteInFile(logFile, txOffset+5); corruptByteInFile(logFile, txOffset+5);
validation = FSEditLogLoader.validateEditLog(logFile); validation = EditLogFileInputStream.validateEditLog(logFile);
assertEquals("Failed when corrupting txn data at " + (txOffset+5), assertEquals("Failed when corrupting txn data at " + (txOffset+5),
txid - 1, validation.numTransactions); txid - 1, validation.numTransactions);
assertEquals(txOffset, validation.validLength); assertEquals(txOffset, validation.validLength);
@ -240,7 +240,7 @@ public void testCountValidTransactions() throws IOException {
for (long offset = 0; offset < validLength; offset++) { for (long offset = 0; offset < validLength; offset++) {
Files.copy(logFileBak, logFile); Files.copy(logFileBak, logFile);
corruptByteInFile(logFile, offset); corruptByteInFile(logFile, offset);
EditLogValidation val = FSEditLogLoader.validateEditLog(logFile); EditLogValidation val = EditLogFileInputStream.validateEditLog(logFile);
assertTrue(val.numTransactions >= prevNumValid); assertTrue(val.numTransactions >= prevNumValid);
prevNumValid = val.numTransactions; prevNumValid = val.numTransactions;
} }