HDFS-2738. FSEditLog.selectinputStreams is reading through in-progress streams even when non-in-progress are requested. Contributed by Aaron T. Myers

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1229931 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-01-11 08:32:10 +00:00
parent a339836bbc
commit 4f1bf2fe23
12 changed files with 136 additions and 58 deletions

View File

@ -101,3 +101,5 @@ HDFS-2773. Reading edit logs from an earlier version should not leave blocks in
HDFS-2775. Fix TestStandbyCheckpoints.testBothNodesInStandbyState failing intermittently. (todd) HDFS-2775. Fix TestStandbyCheckpoints.testBothNodesInStandbyState failing intermittently. (todd)
HDFS-2766. Test for case where standby partially reads log and then performs checkpoint. (atm) HDFS-2766. Test for case where standby partially reads log and then performs checkpoint. (atm)
HDFS-2738. FSEditLog.selectinputStreams is reading through in-progress streams even when non-in-progress are requested. (atm)

View File

@ -312,8 +312,10 @@ public void finalizeLogSegment(long firstTxId, long lastTxId)
} }
} }
// TODO(HA): Handle inProgressOk
@Override @Override
public EditLogInputStream getInputStream(long fromTxnId) throws IOException { public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
throws IOException {
for (EditLogLedgerMetadata l : getLedgerList()) { for (EditLogLedgerMetadata l : getLedgerList()) {
if (l.getFirstTxId() == fromTxnId) { if (l.getFirstTxId() == fromTxnId) {
try { try {
@ -329,8 +331,10 @@ public EditLogInputStream getInputStream(long fromTxnId) throws IOException {
throw new IOException("No ledger for fromTxnId " + fromTxnId + " found."); throw new IOException("No ledger for fromTxnId " + fromTxnId + " found.");
} }
// TODO(HA): Handle inProgressOk
@Override @Override
public long getNumberOfTransactions(long fromTxnId) throws IOException { public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
throws IOException {
long count = 0; long count = 0;
long expectedStart = 0; long expectedStart = 0;
for (EditLogLedgerMetadata l : getLedgerList()) { for (EditLogLedgerMetadata l : getLedgerList()) {

View File

@ -195,7 +195,7 @@ public void testNumberOfTransactions() throws Exception {
out.close(); out.close();
bkjm.finalizeLogSegment(1, 100); bkjm.finalizeLogSegment(1, 100);
long numTrans = bkjm.getNumberOfTransactions(1); long numTrans = bkjm.getNumberOfTransactions(1, true);
assertEquals(100, numTrans); assertEquals(100, numTrans);
} }
@ -218,17 +218,17 @@ public void testNumberOfTransactionsWithGaps() throws Exception {
} }
zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1); zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1);
long numTrans = bkjm.getNumberOfTransactions(1); long numTrans = bkjm.getNumberOfTransactions(1, true);
assertEquals(DEFAULT_SEGMENT_SIZE, numTrans); assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
try { try {
numTrans = bkjm.getNumberOfTransactions(DEFAULT_SEGMENT_SIZE+1); numTrans = bkjm.getNumberOfTransactions(DEFAULT_SEGMENT_SIZE+1, true);
fail("Should have thrown corruption exception by this point"); fail("Should have thrown corruption exception by this point");
} catch (JournalManager.CorruptionException ce) { } catch (JournalManager.CorruptionException ce) {
// if we get here, everything is going good // if we get here, everything is going good
} }
numTrans = bkjm.getNumberOfTransactions((DEFAULT_SEGMENT_SIZE*2)+1); numTrans = bkjm.getNumberOfTransactions((DEFAULT_SEGMENT_SIZE*2)+1, true);
assertEquals(DEFAULT_SEGMENT_SIZE, numTrans); assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
} }
@ -262,7 +262,7 @@ public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
out.abort(); out.abort();
out.close(); out.close();
long numTrans = bkjm.getNumberOfTransactions(1); long numTrans = bkjm.getNumberOfTransactions(1, true);
assertEquals((txid-1), numTrans); assertEquals((txid-1), numTrans);
} }
@ -357,7 +357,7 @@ public void testSimpleRead() throws Exception {
bkjm.finalizeLogSegment(1, numTransactions); bkjm.finalizeLogSegment(1, numTransactions);
EditLogInputStream in = bkjm.getInputStream(1); EditLogInputStream in = bkjm.getInputStream(1, true);
try { try {
assertEquals(numTransactions, assertEquals(numTransactions,
FSEditLogTestUtil.countTransactionsInStream(in)); FSEditLogTestUtil.countTransactionsInStream(in));

View File

@ -58,7 +58,7 @@ public void purgeLogsOlderThan(long minTxIdToKeep)
} }
@Override @Override
public long getNumberOfTransactions(long fromTxnId) public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
throws IOException, CorruptionException { throws IOException, CorruptionException {
// This JournalManager is never used for input. Therefore it cannot // This JournalManager is never used for input. Therefore it cannot
// return any transactions // return any transactions
@ -66,7 +66,8 @@ public long getNumberOfTransactions(long fromTxnId)
} }
@Override @Override
public EditLogInputStream getInputStream(long fromTxnId) throws IOException { public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
throws IOException {
// This JournalManager is never used for input. Therefore it cannot // This JournalManager is never used for input. Therefore it cannot
// return any transactions // return any transactions
throw new IOException("Unsupported operation"); throw new IOException("Unsupported operation");

View File

@ -252,7 +252,7 @@ synchronized void openForWrite() throws IOException {
long segmentTxId = getLastWrittenTxId() + 1; long segmentTxId = getLastWrittenTxId() + 1;
// Safety check: we should never start a segment if there are // Safety check: we should never start a segment if there are
// newer txids readable. // newer txids readable.
EditLogInputStream s = journalSet.getInputStream(segmentTxId); EditLogInputStream s = journalSet.getInputStream(segmentTxId, true);
try { try {
Preconditions.checkState(s == null, Preconditions.checkState(s == null,
"Cannot start writing at txid %s when there is a stream " + "Cannot start writing at txid %s when there is a stream " +
@ -1071,19 +1071,19 @@ Collection<EditLogInputStream> selectInputStreams(long fromTxId,
public Collection<EditLogInputStream> selectInputStreams(long fromTxId, public Collection<EditLogInputStream> selectInputStreams(long fromTxId,
long toAtLeastTxId, boolean inProgressOk) throws IOException { long toAtLeastTxId, boolean inProgressOk) throws IOException {
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>(); List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
EditLogInputStream stream = journalSet.getInputStream(fromTxId); EditLogInputStream stream = journalSet.getInputStream(fromTxId, inProgressOk);
while (stream != null) { while (stream != null) {
if (inProgressOk || !stream.isInProgress()) { streams.add(stream);
streams.add(stream);
}
// We're now looking for a higher range, so reset the fromTxId // We're now looking for a higher range, so reset the fromTxId
fromTxId = stream.getLastTxId() + 1; fromTxId = stream.getLastTxId() + 1;
stream = journalSet.getInputStream(fromTxId); stream = journalSet.getInputStream(fromTxId, inProgressOk);
} }
if (fromTxId <= toAtLeastTxId) { if (fromTxId <= toAtLeastTxId) {
closeAllStreams(streams); closeAllStreams(streams);
throw new IOException("No non-corrupt logs for txid " throw new IOException(String.format("Gap in transactions. Expected to "
+ fromTxId); + "be able to read up until at least txid %d but unable to find any "
+ "edit logs containing txid %d", toAtLeastTxId, fromTxId));
} }
return streams; return streams;
} }

View File

@ -585,9 +585,12 @@ boolean loadFSImage(FSNamesystem target) throws IOException {
if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
getLayoutVersion())) { getLayoutVersion())) {
// If we're open for write, we're either non-HA or we're the active NN, so
// we better be able to load all the edits. If we're the standby NN, it's
// OK to not be able to read all of edits right now.
long toAtLeastTxId = editLog.isOpenForWrite() ? inspector.getMaxSeenTxId() : 0;
editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1, editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1,
inspector.getMaxSeenTxId(), toAtLeastTxId, false);
false);
} else { } else {
editStreams = FSImagePreTransactionalStorageInspector editStreams = FSImagePreTransactionalStorageInspector
.getEditLogStreams(storage); .getEditLogStreams(storage);

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger; import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation; import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
@ -192,10 +193,13 @@ static List<EditLogFile> matchEditLogs(File[] filesInStorage) {
} }
@Override @Override
synchronized public EditLogInputStream getInputStream(long fromTxId) synchronized public EditLogInputStream getInputStream(long fromTxId,
throws IOException { boolean inProgressOk) throws IOException {
for (EditLogFile elf : getLogFiles(fromTxId)) { for (EditLogFile elf : getLogFiles(fromTxId)) {
if (elf.containsTxId(fromTxId)) { if (elf.containsTxId(fromTxId)) {
if (!inProgressOk && elf.isInProgress()) {
continue;
}
if (elf.isInProgress()) { if (elf.isInProgress()) {
elf.validateLog(); elf.validateLog();
} }
@ -219,7 +223,7 @@ synchronized public EditLogInputStream getInputStream(long fromTxId)
} }
@Override @Override
public long getNumberOfTransactions(long fromTxId) public long getNumberOfTransactions(long fromTxId, boolean inProgressOk)
throws IOException, CorruptionException { throws IOException, CorruptionException {
long numTxns = 0L; long numTxns = 0L;
@ -232,6 +236,10 @@ public long getNumberOfTransactions(long fromTxId)
+ fromTxId + " - " + (elf.getFirstTxId() - 1)); + fromTxId + " - " + (elf.getFirstTxId() - 1));
break; break;
} else if (elf.containsTxId(fromTxId)) { } else if (elf.containsTxId(fromTxId)) {
if (!inProgressOk && elf.isInProgress()) {
break;
}
if (elf.isInProgress()) { if (elf.isInProgress()) {
elf.validateLog(); elf.validateLog();
} }
@ -253,7 +261,7 @@ public long getNumberOfTransactions(long fromTxId)
+ " txns from " + fromTxId); + " txns from " + fromTxId);
} }
long max = findMaxTransaction(); long max = findMaxTransaction(inProgressOk);
// fromTxId should be greater than max, as it points to the next // fromTxId should be greater than max, as it points to the next
// transaction we should expect to find. If it is less than or equal // transaction we should expect to find. If it is less than or equal
@ -276,7 +284,7 @@ synchronized public void recoverUnfinalizedSegments() throws IOException {
// make sure journal is aware of max seen transaction before moving corrupt // make sure journal is aware of max seen transaction before moving corrupt
// files aside // files aside
findMaxTransaction(); findMaxTransaction(true);
for (EditLogFile elf : allLogFiles) { for (EditLogFile elf : allLogFiles) {
if (elf.getFile().equals(currentInProgress)) { if (elf.getFile().equals(currentInProgress)) {
@ -318,9 +326,13 @@ private List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
* tranaction id in the case that it was the maximum transaction in * tranaction id in the case that it was the maximum transaction in
* the journal. * the journal.
*/ */
private long findMaxTransaction() private long findMaxTransaction(boolean inProgressOk)
throws IOException { throws IOException {
for (EditLogFile elf : getLogFiles(0)) { for (EditLogFile elf : getLogFiles(0)) {
if (elf.isInProgress() && !inProgressOk) {
continue;
}
if (elf.isInProgress()) { if (elf.isInProgress()) {
maxSeenTransaction = Math.max(elf.getFirstTxId(), maxSeenTransaction); maxSeenTransaction = Math.max(elf.getFirstTxId(), maxSeenTransaction);
elf.validateLog(); elf.validateLog();

View File

@ -48,20 +48,23 @@ public interface JournalManager extends Closeable {
/** /**
* Get the input stream starting with fromTxnId from this journal manager * Get the input stream starting with fromTxnId from this journal manager
* @param fromTxnId the first transaction id we want to read * @param fromTxnId the first transaction id we want to read
* @param inProgressOk whether or not in-progress streams should be returned
* @return the stream starting with transaction fromTxnId * @return the stream starting with transaction fromTxnId
* @throws IOException if a stream cannot be found. * @throws IOException if a stream cannot be found.
*/ */
EditLogInputStream getInputStream(long fromTxnId) throws IOException; EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
throws IOException;
/** /**
* Get the number of transaction contiguously available from fromTxnId. * Get the number of transaction contiguously available from fromTxnId.
* *
* @param fromTxnId Transaction id to count from * @param fromTxnId Transaction id to count from
* @param inProgressOk whether or not in-progress streams should be counted
* @return The number of transactions available from fromTxnId * @return The number of transactions available from fromTxnId
* @throws IOException if the journal cannot be read. * @throws IOException if the journal cannot be read.
* @throws CorruptionException if there is a gap in the journal at fromTxnId. * @throws CorruptionException if there is a gap in the journal at fromTxnId.
*/ */
long getNumberOfTransactions(long fromTxnId) long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
throws IOException, CorruptionException; throws IOException, CorruptionException;
/** /**

View File

@ -198,7 +198,8 @@ public void apply(JournalAndStream jas) throws IOException {
* or null if no more exist * or null if no more exist
*/ */
@Override @Override
public EditLogInputStream getInputStream(long fromTxnId) throws IOException { public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
throws IOException {
JournalManager bestjm = null; JournalManager bestjm = null;
long bestjmNumTxns = 0; long bestjmNumTxns = 0;
CorruptionException corruption = null; CorruptionException corruption = null;
@ -209,7 +210,8 @@ public EditLogInputStream getInputStream(long fromTxnId) throws IOException {
JournalManager candidate = jas.getManager(); JournalManager candidate = jas.getManager();
long candidateNumTxns = 0; long candidateNumTxns = 0;
try { try {
candidateNumTxns = candidate.getNumberOfTransactions(fromTxnId); candidateNumTxns = candidate.getNumberOfTransactions(fromTxnId,
inProgressOk);
} catch (CorruptionException ce) { } catch (CorruptionException ce) {
corruption = ce; corruption = ce;
} catch (IOException ioe) { } catch (IOException ioe) {
@ -232,18 +234,20 @@ public EditLogInputStream getInputStream(long fromTxnId) throws IOException {
return null; return null;
} }
} }
return bestjm.getInputStream(fromTxnId); return bestjm.getInputStream(fromTxnId, inProgressOk);
} }
@Override @Override
public long getNumberOfTransactions(long fromTxnId) throws IOException { public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
throws IOException {
long num = 0; long num = 0;
for (JournalAndStream jas: journals) { for (JournalAndStream jas: journals) {
if (jas.isDisabled()) { if (jas.isDisabled()) {
LOG.info("Skipping jas " + jas + " since it's disabled"); LOG.info("Skipping jas " + jas + " since it's disabled");
continue; continue;
} else { } else {
long newNum = jas.getManager().getNumberOfTransactions(fromTxnId); long newNum = jas.getManager().getNumberOfTransactions(fromTxnId,
inProgressOk);
if (newNum > num) { if (newNum > num) {
num = newNum; num = newNum;
} }

View File

@ -936,11 +936,11 @@ static class AbortSpec {
* *
* @param editUris directories to create edit logs in * @param editUris directories to create edit logs in
* @param numrolls number of times to roll the edit log during setup * @param numrolls number of times to roll the edit log during setup
* @param closeOnFinish whether to close the edit log after setup
* @param abortAtRolls Specifications for when to fail, see AbortSpec * @param abortAtRolls Specifications for when to fail, see AbortSpec
*/ */
public static NNStorage setupEdits(List<URI> editUris, int numrolls, public static NNStorage setupEdits(List<URI> editUris, int numrolls,
AbortSpec... abortAtRolls) boolean closeOnFinish, AbortSpec... abortAtRolls) throws IOException {
throws IOException {
List<AbortSpec> aborts = new ArrayList<AbortSpec>(Arrays.asList(abortAtRolls)); List<AbortSpec> aborts = new ArrayList<AbortSpec>(Arrays.asList(abortAtRolls));
NNStorage storage = new NNStorage(new Configuration(), NNStorage storage = new NNStorage(new Configuration(),
Collections.<URI>emptyList(), Collections.<URI>emptyList(),
@ -979,16 +979,34 @@ public static NNStorage setupEdits(List<URI> editUris, int numrolls,
} }
editlog.logSync(); editlog.logSync();
} }
editlog.close();
if (closeOnFinish) {
editlog.close();
}
FSImageTestUtil.logStorageContents(LOG, storage); FSImageTestUtil.logStorageContents(LOG, storage);
return storage; return storage;
} }
/**
* Set up directories for tests.
*
* Each rolled file is 10 txns long.
* A failed file is 2 txns long.
*
* @param editUris directories to create edit logs in
* @param numrolls number of times to roll the edit log during setup
* @param abortAtRolls Specifications for when to fail, see AbortSpec
*/
public static NNStorage setupEdits(List<URI> editUris, int numrolls,
AbortSpec... abortAtRolls) throws IOException {
return setupEdits(editUris, numrolls, true, abortAtRolls);
}
/** /**
* Test loading an editlog which has had both its storage fail * Test loading an editlog which has had both its storage fail
* on alternating rolls. Two edit log directories are created. * on alternating rolls. Two edit log directories are created.
* The first on fails on odd rolls, the second on even. Test * The first one fails on odd rolls, the second on even. Test
* that we are able to load the entire editlog regardless. * that we are able to load the entire editlog regardless.
*/ */
@Test @Test

View File

@ -60,7 +60,7 @@ public void testNormalOperation() throws IOException {
long numJournals = 0; long numJournals = 0;
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) { for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
FileJournalManager jm = new FileJournalManager(sd); FileJournalManager jm = new FileJournalManager(sd);
assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1)); assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true));
numJournals++; numJournals++;
} }
assertEquals(3, numJournals); assertEquals(3, numJournals);
@ -81,7 +81,7 @@ public void testInprogressRecovery() throws IOException {
FileJournalManager jm = new FileJournalManager(sd); FileJournalManager jm = new FileJournalManager(sd);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL,
jm.getNumberOfTransactions(1)); jm.getNumberOfTransactions(1, true));
} }
/** /**
@ -103,15 +103,16 @@ public void testInprogressRecoveryMixed() throws IOException {
Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS); Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS);
StorageDirectory sd = dirs.next(); StorageDirectory sd = dirs.next();
FileJournalManager jm = new FileJournalManager(sd); FileJournalManager jm = new FileJournalManager(sd);
assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1)); assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true));
sd = dirs.next(); sd = dirs.next();
jm = new FileJournalManager(sd); jm = new FileJournalManager(sd);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1)); assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1,
true));
sd = dirs.next(); sd = dirs.next();
jm = new FileJournalManager(sd); jm = new FileJournalManager(sd);
assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1)); assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true));
} }
/** /**
@ -135,15 +136,18 @@ public void testInprogressRecoveryAll() throws IOException {
Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS); Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS);
StorageDirectory sd = dirs.next(); StorageDirectory sd = dirs.next();
FileJournalManager jm = new FileJournalManager(sd); FileJournalManager jm = new FileJournalManager(sd);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1)); assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1,
true));
sd = dirs.next(); sd = dirs.next();
jm = new FileJournalManager(sd); jm = new FileJournalManager(sd);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1)); assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1,
true));
sd = dirs.next(); sd = dirs.next();
jm = new FileJournalManager(sd); jm = new FileJournalManager(sd);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1)); assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1,
true));
} }
/** /**
@ -174,15 +178,15 @@ public void testReadFromStream() throws IOException {
FileJournalManager jm = new FileJournalManager(sd); FileJournalManager jm = new FileJournalManager(sd);
long expectedTotalTxnCount = TXNS_PER_ROLL*10 + TXNS_PER_FAIL; long expectedTotalTxnCount = TXNS_PER_ROLL*10 + TXNS_PER_FAIL;
assertEquals(expectedTotalTxnCount, jm.getNumberOfTransactions(1)); assertEquals(expectedTotalTxnCount, jm.getNumberOfTransactions(1, true));
long skippedTxns = (3*TXNS_PER_ROLL); // skip first 3 files long skippedTxns = (3*TXNS_PER_ROLL); // skip first 3 files
long startingTxId = skippedTxns + 1; long startingTxId = skippedTxns + 1;
long numTransactionsToLoad = jm.getNumberOfTransactions(startingTxId); long numTransactionsToLoad = jm.getNumberOfTransactions(startingTxId, true);
long numLoaded = 0; long numLoaded = 0;
while (numLoaded < numTransactionsToLoad) { while (numLoaded < numTransactionsToLoad) {
EditLogInputStream editIn = jm.getInputStream(startingTxId); EditLogInputStream editIn = jm.getInputStream(startingTxId, true);
FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(editIn); FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(editIn);
long count = val.getNumTransactions(); long count = val.getNumTransactions();
@ -212,7 +216,8 @@ public void testAskForTransactionsMidfile() throws IOException {
// 10 rolls, so 11 rolled files, 110 txids total. // 10 rolls, so 11 rolled files, 110 txids total.
final int TOTAL_TXIDS = 10 * 11; final int TOTAL_TXIDS = 10 * 11;
for (int txid = 1; txid <= TOTAL_TXIDS; txid++) { for (int txid = 1; txid <= TOTAL_TXIDS; txid++) {
assertEquals((TOTAL_TXIDS - txid) + 1, jm.getNumberOfTransactions(txid)); assertEquals((TOTAL_TXIDS - txid) + 1, jm.getNumberOfTransactions(txid,
true));
} }
} }
@ -244,10 +249,10 @@ public boolean accept(File dir, String name) {
assertTrue(files[0].delete()); assertTrue(files[0].delete());
FileJournalManager jm = new FileJournalManager(sd); FileJournalManager jm = new FileJournalManager(sd);
assertEquals(startGapTxId-1, jm.getNumberOfTransactions(1)); assertEquals(startGapTxId-1, jm.getNumberOfTransactions(1, true));
try { try {
jm.getNumberOfTransactions(startGapTxId); jm.getNumberOfTransactions(startGapTxId, true);
fail("Should have thrown an exception by now"); fail("Should have thrown an exception by now");
} catch (IOException ioe) { } catch (IOException ioe) {
assertTrue(true); assertTrue(true);
@ -255,7 +260,7 @@ public boolean accept(File dir, String name) {
// rolled 10 times so there should be 11 files. // rolled 10 times so there should be 11 files.
assertEquals(11*TXNS_PER_ROLL - endGapTxId, assertEquals(11*TXNS_PER_ROLL - endGapTxId,
jm.getNumberOfTransactions(endGapTxId+1)); jm.getNumberOfTransactions(endGapTxId + 1, true));
} }
/** /**
@ -282,7 +287,7 @@ public boolean accept(File dir, String name) {
FileJournalManager jm = new FileJournalManager(sd); FileJournalManager jm = new FileJournalManager(sd);
assertEquals(10*TXNS_PER_ROLL+1, assertEquals(10*TXNS_PER_ROLL+1,
jm.getNumberOfTransactions(1)); jm.getNumberOfTransactions(1, true));
} }
@Test @Test
@ -323,11 +328,37 @@ public void testReadFromMiddleOfEditLog() throws CorruptionException,
FileJournalManager jm = new FileJournalManager(sd); FileJournalManager jm = new FileJournalManager(sd);
EditLogInputStream elis = jm.getInputStream(5); EditLogInputStream elis = jm.getInputStream(5, true);
FSEditLogOp op = elis.readOp(); FSEditLogOp op = elis.readOp();
assertEquals("read unexpected op", op.getTransactionId(), 5); assertEquals("read unexpected op", op.getTransactionId(), 5);
} }
/**
* Make sure that in-progress streams aren't counted if we don't ask for
* them.
*/
@Test
public void testExcludeInProgressStreams() throws CorruptionException,
IOException {
File f = new File(TestEditLog.TEST_DIR + "/filejournaltest2");
// Don't close the edit log once the files have been set up.
NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()),
10, false);
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
FileJournalManager jm = new FileJournalManager(sd);
// If we exclude the in-progess stream, we should only have 100 tx.
assertEquals(100, jm.getNumberOfTransactions(1, false));
EditLogInputStream elis = jm.getInputStream(90, false);
FSEditLogOp lastReadOp = null;
while ((lastReadOp = elis.readOp()) != null) {
assertTrue(lastReadOp.getTransactionId() <= 100);
}
}
private static String getLogsAsString( private static String getLogsAsString(
FileJournalManager fjm, long firstTxId) throws IOException { FileJournalManager fjm, long firstTxId) throws IOException {
return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId)); return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId));

View File

@ -144,13 +144,13 @@ public void finalizeLogSegment(long firstTxId, long lastTxId)
} }
@Override @Override
public EditLogInputStream getInputStream(long fromTxnId) public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
throws IOException { throws IOException {
return null; return null;
} }
@Override @Override
public long getNumberOfTransactions(long fromTxnId) public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
throws IOException { throws IOException {
return 0; return 0;
} }