From c0dcdd67d1cc115de7887c43a6ea78c0b61a77ed Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Thu, 31 May 2012 19:29:58 +0000 Subject: [PATCH] HDFS-3441. Race condition between rolling logs at active NN and purging at standby. Contributed by Rakesh R. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1344874 13f79535-47bb-0310-9956-ffa450edef68 --- .../bkjournal/BookKeeperJournalManager.java | 49 +++++++++------- .../TestBookKeeperJournalManager.java | 57 +++++++++++++++++++ 2 files changed, 87 insertions(+), 19 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java index 31bdc514d9..5317a0f66c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java @@ -49,7 +49,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - +import com.google.common.annotations.VisibleForTesting; /** * BookKeeper Journal Manager * @@ -122,7 +122,9 @@ public class BookKeeperJournalManager implements JournalManager { = "dfs.namenode.bookkeeperjournal.zk.session.timeout"; public static final int BKJM_ZK_SESSION_TIMEOUT_DEFAULT = 3000; - private final ZooKeeper zkc; + private static final String BKJM_EDIT_INPROGRESS = "inprogress_"; + + private ZooKeeper zkc; private final Configuration conf; private final BookKeeper bkc; private final CurrentInprogress ci; @@ -351,13 +353,9 @@ public void finalizeLogSegment(long firstTxId, long lastTxId) EditLogInputStream getInputStream(long fromTxId, boolean inProgressOk) throws IOException { - for (EditLogLedgerMetadata l : getLedgerList()) { + for (EditLogLedgerMetadata l : getLedgerList(inProgressOk)) { long lastTxId = l.getLastTxId(); if (l.isInProgress()) { - if (!inProgressOk) { - continue; - } - lastTxId = recoverLastTxId(l, false); } @@ -413,13 +411,9 @@ long getNumberOfTransactions(long fromTxId, boolean inProgressOk) throws IOException { long count = 0; long expectedStart = 0; - for (EditLogLedgerMetadata l : getLedgerList()) { + for (EditLogLedgerMetadata l : getLedgerList(inProgressOk)) { long lastTxId = l.getLastTxId(); if (l.isInProgress()) { - if (!inProgressOk) { - continue; - } - lastTxId = recoverLastTxId(l, false); if (lastTxId == HdfsConstants.INVALID_TXID) { break; @@ -457,7 +451,7 @@ public void recoverUnfinalizedSegments() throws IOException { try { List children = zkc.getChildren(ledgerPath, false); for (String child : children) { - if (!child.startsWith("inprogress_")) { + if (!child.startsWith(BKJM_EDIT_INPROGRESS)) { continue; } String znode = ledgerPath + "/" + child; @@ -504,9 +498,8 @@ public void recoverUnfinalizedSegments() throws IOException { @Override public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException { - for (EditLogLedgerMetadata l : getLedgerList()) { - if (!l.isInProgress() - && l.getLastTxId() < minTxIdToKeep) { + for (EditLogLedgerMetadata l : getLedgerList(false)) { + if (l.getLastTxId() < minTxIdToKeep) { try { Stat stat = zkc.exists(l.getZkPath(), false); zkc.delete(l.getZkPath(), stat.getVersion()); @@ -597,13 +590,26 @@ private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence) /** * Get a list of all segments in the journal. */ - private List getLedgerList() throws IOException { + List getLedgerList(boolean inProgressOk) + throws IOException { List ledgers = new ArrayList(); try { List ledgerNames = zkc.getChildren(ledgerPath, false); - for (String n : ledgerNames) { - ledgers.add(EditLogLedgerMetadata.read(zkc, ledgerPath + "/" + n)); + for (String ledgerName : ledgerNames) { + if (!inProgressOk && ledgerName.contains(BKJM_EDIT_INPROGRESS)) { + continue; + } + String legderMetadataPath = ledgerPath + "/" + ledgerName; + try { + EditLogLedgerMetadata editLogLedgerMetadata = EditLogLedgerMetadata + .read(zkc, legderMetadataPath); + ledgers.add(editLogLedgerMetadata); + } catch (KeeperException.NoNodeException e) { + LOG.warn("ZNode: " + legderMetadataPath + + " might have finalized and deleted." + + " So ignoring NoNodeException."); + } } } catch (KeeperException e) { throw new IOException("Exception reading ledger list from zk", e); @@ -630,6 +636,11 @@ String inprogressZNode(long startTxid) { return ledgerPath + "/inprogress_" + Long.toString(startTxid, 16); } + @VisibleForTesting + void setZooKeeper(ZooKeeper zk) { + this.zkc = zk; + } + /** * Simple watcher to notify when zookeeper has connected */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java index c05a3bdfb1..9476dea81c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java @@ -18,14 +18,17 @@ package org.apache.hadoop.contrib.bkjournal; import static org.junit.Assert.*; +import static org.mockito.Mockito.spy; import org.junit.Test; import org.junit.Before; import org.junit.After; import org.junit.BeforeClass; import org.junit.AfterClass; +import org.mockito.Mockito; import java.io.IOException; import java.net.URI; +import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -37,6 +40,7 @@ import org.apache.bookkeeper.proto.BookieServer; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; @@ -614,4 +618,57 @@ public void testRefinalizeAlreadyFinalizedInprogress() throws Exception { bkjm.close(); } + /** + * Tests that the edit log file meta data reading from ZooKeeper should be + * able to handle the NoNodeException. bkjm.getInputStream(fromTxId, + * inProgressOk) should suppress the NoNodeException and continue. HDFS-3441. + */ + @Test + public void testEditLogFileNotExistsWhenReadingMetadata() throws Exception { + URI uri = BKJMUtil.createJournalURI("/hdfsjournal-editlogfile"); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); + try { + // start new inprogress log segment with txid=1 + // and write transactions till txid=50 + String zkpath1 = startAndFinalizeLogSegment(bkjm, 1, 50); + + // start new inprogress log segment with txid=51 + // and write transactions till txid=100 + String zkpath2 = startAndFinalizeLogSegment(bkjm, 51, 100); + + // read the metadata from ZK. Here simulating the situation + // when reading,the edit log metadata can be removed by purger thread. + ZooKeeper zkspy = spy(BKJMUtil.connectZooKeeper()); + bkjm.setZooKeeper(zkspy); + Mockito.doThrow( + new KeeperException.NoNodeException(zkpath2 + " doesn't exists")) + .when(zkspy).getData(zkpath2, false, null); + + List ledgerList = bkjm.getLedgerList(false); + assertEquals("List contains the metadata of non exists path.", 1, + ledgerList.size()); + assertEquals("LogLedgerMetadata contains wrong zk paths.", zkpath1, + ledgerList.get(0).getZkPath()); + } finally { + bkjm.close(); + } + } + + private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm, + int startTxid, int endTxid) throws IOException, KeeperException, + InterruptedException { + EditLogOutputStream out = bkjm.startLogSegment(startTxid); + for (long i = startTxid; i <= endTxid; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + out.close(); + // finalize the inprogress_1 log segment. + bkjm.finalizeLogSegment(startTxid, endTxid); + String zkpath1 = bkjm.finalizedLedgerZNode(startTxid, endTxid); + assertNotNull(zkc.exists(zkpath1, false)); + assertNull(zkc.exists(bkjm.inprogressZNode(startTxid), false)); + return zkpath1; + } }