HDFS-3441. Race condition between rolling logs at active NN and purging at standby. Contributed by Rakesh R.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1344874 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fe6dfad79d
commit
c0dcdd67d1
@ -49,7 +49,7 @@
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
/**
|
||||
* BookKeeper Journal Manager
|
||||
*
|
||||
@ -122,7 +122,9 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||
= "dfs.namenode.bookkeeperjournal.zk.session.timeout";
|
||||
public static final int BKJM_ZK_SESSION_TIMEOUT_DEFAULT = 3000;
|
||||
|
||||
private final ZooKeeper zkc;
|
||||
private static final String BKJM_EDIT_INPROGRESS = "inprogress_";
|
||||
|
||||
private ZooKeeper zkc;
|
||||
private final Configuration conf;
|
||||
private final BookKeeper bkc;
|
||||
private final CurrentInprogress ci;
|
||||
@ -351,13 +353,9 @@ public void finalizeLogSegment(long firstTxId, long lastTxId)
|
||||
|
||||
EditLogInputStream getInputStream(long fromTxId, boolean inProgressOk)
|
||||
throws IOException {
|
||||
for (EditLogLedgerMetadata l : getLedgerList()) {
|
||||
for (EditLogLedgerMetadata l : getLedgerList(inProgressOk)) {
|
||||
long lastTxId = l.getLastTxId();
|
||||
if (l.isInProgress()) {
|
||||
if (!inProgressOk) {
|
||||
continue;
|
||||
}
|
||||
|
||||
lastTxId = recoverLastTxId(l, false);
|
||||
}
|
||||
|
||||
@ -413,13 +411,9 @@ long getNumberOfTransactions(long fromTxId, boolean inProgressOk)
|
||||
throws IOException {
|
||||
long count = 0;
|
||||
long expectedStart = 0;
|
||||
for (EditLogLedgerMetadata l : getLedgerList()) {
|
||||
for (EditLogLedgerMetadata l : getLedgerList(inProgressOk)) {
|
||||
long lastTxId = l.getLastTxId();
|
||||
if (l.isInProgress()) {
|
||||
if (!inProgressOk) {
|
||||
continue;
|
||||
}
|
||||
|
||||
lastTxId = recoverLastTxId(l, false);
|
||||
if (lastTxId == HdfsConstants.INVALID_TXID) {
|
||||
break;
|
||||
@ -457,7 +451,7 @@ public void recoverUnfinalizedSegments() throws IOException {
|
||||
try {
|
||||
List<String> children = zkc.getChildren(ledgerPath, false);
|
||||
for (String child : children) {
|
||||
if (!child.startsWith("inprogress_")) {
|
||||
if (!child.startsWith(BKJM_EDIT_INPROGRESS)) {
|
||||
continue;
|
||||
}
|
||||
String znode = ledgerPath + "/" + child;
|
||||
@ -504,9 +498,8 @@ public void recoverUnfinalizedSegments() throws IOException {
|
||||
@Override
|
||||
public void purgeLogsOlderThan(long minTxIdToKeep)
|
||||
throws IOException {
|
||||
for (EditLogLedgerMetadata l : getLedgerList()) {
|
||||
if (!l.isInProgress()
|
||||
&& l.getLastTxId() < minTxIdToKeep) {
|
||||
for (EditLogLedgerMetadata l : getLedgerList(false)) {
|
||||
if (l.getLastTxId() < minTxIdToKeep) {
|
||||
try {
|
||||
Stat stat = zkc.exists(l.getZkPath(), false);
|
||||
zkc.delete(l.getZkPath(), stat.getVersion());
|
||||
@ -597,13 +590,26 @@ private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence)
|
||||
/**
|
||||
* Get a list of all segments in the journal.
|
||||
*/
|
||||
private List<EditLogLedgerMetadata> getLedgerList() throws IOException {
|
||||
List<EditLogLedgerMetadata> getLedgerList(boolean inProgressOk)
|
||||
throws IOException {
|
||||
List<EditLogLedgerMetadata> ledgers
|
||||
= new ArrayList<EditLogLedgerMetadata>();
|
||||
try {
|
||||
List<String> ledgerNames = zkc.getChildren(ledgerPath, false);
|
||||
for (String n : ledgerNames) {
|
||||
ledgers.add(EditLogLedgerMetadata.read(zkc, ledgerPath + "/" + n));
|
||||
for (String ledgerName : ledgerNames) {
|
||||
if (!inProgressOk && ledgerName.contains(BKJM_EDIT_INPROGRESS)) {
|
||||
continue;
|
||||
}
|
||||
String legderMetadataPath = ledgerPath + "/" + ledgerName;
|
||||
try {
|
||||
EditLogLedgerMetadata editLogLedgerMetadata = EditLogLedgerMetadata
|
||||
.read(zkc, legderMetadataPath);
|
||||
ledgers.add(editLogLedgerMetadata);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
LOG.warn("ZNode: " + legderMetadataPath
|
||||
+ " might have finalized and deleted."
|
||||
+ " So ignoring NoNodeException.");
|
||||
}
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Exception reading ledger list from zk", e);
|
||||
@ -630,6 +636,11 @@ String inprogressZNode(long startTxid) {
|
||||
return ledgerPath + "/inprogress_" + Long.toString(startTxid, 16);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setZooKeeper(ZooKeeper zk) {
|
||||
this.zkc = zk;
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple watcher to notify when zookeeper has connected
|
||||
*/
|
||||
|
@ -18,14 +18,17 @@
|
||||
package org.apache.hadoop.contrib.bkjournal;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import org.junit.Test;
|
||||
import org.junit.Before;
|
||||
import org.junit.After;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.AfterClass;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
@ -37,6 +40,7 @@
|
||||
|
||||
import org.apache.bookkeeper.proto.BookieServer;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.ZooDefs.Ids;
|
||||
|
||||
@ -614,4 +618,57 @@ public void testRefinalizeAlreadyFinalizedInprogress() throws Exception {
|
||||
bkjm.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that the edit log file meta data reading from ZooKeeper should be
|
||||
* able to handle the NoNodeException. bkjm.getInputStream(fromTxId,
|
||||
* inProgressOk) should suppress the NoNodeException and continue. HDFS-3441.
|
||||
*/
|
||||
@Test
|
||||
public void testEditLogFileNotExistsWhenReadingMetadata() throws Exception {
|
||||
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-editlogfile");
|
||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
|
||||
try {
|
||||
// start new inprogress log segment with txid=1
|
||||
// and write transactions till txid=50
|
||||
String zkpath1 = startAndFinalizeLogSegment(bkjm, 1, 50);
|
||||
|
||||
// start new inprogress log segment with txid=51
|
||||
// and write transactions till txid=100
|
||||
String zkpath2 = startAndFinalizeLogSegment(bkjm, 51, 100);
|
||||
|
||||
// read the metadata from ZK. Here simulating the situation
|
||||
// when reading,the edit log metadata can be removed by purger thread.
|
||||
ZooKeeper zkspy = spy(BKJMUtil.connectZooKeeper());
|
||||
bkjm.setZooKeeper(zkspy);
|
||||
Mockito.doThrow(
|
||||
new KeeperException.NoNodeException(zkpath2 + " doesn't exists"))
|
||||
.when(zkspy).getData(zkpath2, false, null);
|
||||
|
||||
List<EditLogLedgerMetadata> ledgerList = bkjm.getLedgerList(false);
|
||||
assertEquals("List contains the metadata of non exists path.", 1,
|
||||
ledgerList.size());
|
||||
assertEquals("LogLedgerMetadata contains wrong zk paths.", zkpath1,
|
||||
ledgerList.get(0).getZkPath());
|
||||
} finally {
|
||||
bkjm.close();
|
||||
}
|
||||
}
|
||||
|
||||
private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm,
|
||||
int startTxid, int endTxid) throws IOException, KeeperException,
|
||||
InterruptedException {
|
||||
EditLogOutputStream out = bkjm.startLogSegment(startTxid);
|
||||
for (long i = startTxid; i <= endTxid; i++) {
|
||||
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
||||
op.setTransactionId(i);
|
||||
out.write(op);
|
||||
}
|
||||
out.close();
|
||||
// finalize the inprogress_1 log segment.
|
||||
bkjm.finalizeLogSegment(startTxid, endTxid);
|
||||
String zkpath1 = bkjm.finalizedLedgerZNode(startTxid, endTxid);
|
||||
assertNotNull(zkc.exists(zkpath1, false));
|
||||
assertNull(zkc.exists(bkjm.inprogressZNode(startTxid), false));
|
||||
return zkpath1;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user