HDFS-14674. [SBN read] Got an unexpected txid when tail editlog. Contributed by wangzhaohui.
This commit is contained in:
parent
d699022fce
commit
ebef99dcf4
@ -894,6 +894,7 @@ public long loadEdits(Iterable<EditLogInputStream> editStreams,
|
||||
prog.beginPhase(Phase.LOADING_EDITS);
|
||||
|
||||
long prevLastAppliedTxId = lastAppliedTxId;
|
||||
long remainingReadTxns = maxTxnsToRead;
|
||||
try {
|
||||
FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId);
|
||||
|
||||
@ -910,8 +911,8 @@ public long loadEdits(Iterable<EditLogInputStream> editStreams,
|
||||
(lastAppliedTxId + 1) + logSuppressed);
|
||||
}
|
||||
try {
|
||||
loader.loadFSEdits(editIn, lastAppliedTxId + 1, maxTxnsToRead,
|
||||
startOpt, recovery);
|
||||
remainingReadTxns -= loader.loadFSEdits(editIn, lastAppliedTxId + 1,
|
||||
remainingReadTxns, startOpt, recovery);
|
||||
} finally {
|
||||
// Update lastAppliedTxId even in case of error, since some ops may
|
||||
// have been successfully applied before the error.
|
||||
@ -922,6 +923,9 @@ public long loadEdits(Iterable<EditLogInputStream> editStreams,
|
||||
&& recovery != null) {
|
||||
lastAppliedTxId = editIn.getLastTxId();
|
||||
}
|
||||
if (remainingReadTxns <= 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
FSEditLog.closeAllStreams(editStreams);
|
||||
|
@ -307,6 +307,87 @@ private long testLoad(byte[] data, FSNamesystem namesys) throws IOException {
|
||||
return loader.loadFSEdits(new EditLogByteInputStream(data), 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiStreamsLoadEditWithConfMaxTxns()
|
||||
throws IOException {
|
||||
Configuration conf = getConf();
|
||||
MiniDFSCluster cluster = null;
|
||||
FileSystem fileSystem = null;
|
||||
FSImage writeFsImage = null;
|
||||
try {
|
||||
cluster = new MiniDFSCluster
|
||||
.Builder(conf)
|
||||
.numDataNodes(NUM_DATA_NODES)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
fileSystem = cluster.getFileSystem();
|
||||
final FSNamesystem namesystem = cluster.getNamesystem();
|
||||
writeFsImage = namesystem.getFSImage();
|
||||
for (Iterator<URI> it = cluster.getNameDirs(0)
|
||||
.iterator(); it.hasNext();) {
|
||||
File dir = new File(it.next().getPath());
|
||||
System.out.println(dir);
|
||||
}
|
||||
// Roll log so new output buffer size takes effect
|
||||
// we should now be writing to edits_inprogress_3
|
||||
long originalLastInodeId = namesystem.dir.getLastInodeId();
|
||||
// Reopen some files as for append
|
||||
Transactions trans = new Transactions(
|
||||
namesystem, NUM_TRANSACTIONS, NUM_TRANSACTIONS / 2);
|
||||
trans.run();
|
||||
// Roll another time to finalize edits_inprogress_3
|
||||
writeFsImage.rollEditLog(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
||||
Transactions trans1 = new Transactions(
|
||||
namesystem, NUM_TRANSACTIONS, NUM_TRANSACTIONS / 2);
|
||||
trans1.run();
|
||||
writeFsImage.rollEditLog(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
||||
namesystem.dir.resetLastInodeIdWithoutChecking(originalLastInodeId);
|
||||
for(Iterator<StorageDirectory> it = writeFsImage.getStorage().
|
||||
dirIterator(NameNodeDirType.EDITS); it.hasNext();){
|
||||
long expectedTxns = (2 * NUM_TRANSACTIONS) + 2;
|
||||
File editFile = NNStorage.getFinalizedEditsFile(it.next(),
|
||||
1, expectedTxns);
|
||||
File editFile1 = NNStorage.getFinalizedEditsFile(it.next(),
|
||||
203, 404);
|
||||
assertTrue("Expect " + editFile + " exists", editFile.exists());
|
||||
assertTrue("Expect " + editFile1 + " exists", editFile1.exists());
|
||||
EditLogFileInputStream editLogFileInputStream1 =
|
||||
new EditLogFileInputStream(editFile, 1, 202, false);
|
||||
EditLogFileInputStream editLogFileInputStream2 =
|
||||
new EditLogFileInputStream(editFile1, 203, 404, false);
|
||||
List<EditLogInputStream> editStreams = Lists.newArrayList();
|
||||
editStreams.add(editLogFileInputStream1);
|
||||
editStreams.add(editLogFileInputStream2);
|
||||
FSImage readFsImage = new FSImage(conf);
|
||||
try {
|
||||
readFsImage.loadEdits(editStreams, namesystem, 100, null, null);
|
||||
} catch (Exception e){
|
||||
LOG.error("There appears to be an out-of-order edit in the edit log",
|
||||
e.getMessage());
|
||||
fail("no exception should be thrown");
|
||||
} finally {
|
||||
if (readFsImage != null) {
|
||||
readFsImage.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
if(fileSystem != null) {
|
||||
fileSystem.close();
|
||||
}
|
||||
if(cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
if(writeFsImage != null){
|
||||
writeFsImage.close();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.error("Couldn't shut down cleanly", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple test for writing to and rolling the edit log.
|
||||
*/
|
||||
|
Loading…
Reference in New Issue
Block a user