From 64ec449010be3e1c37ec9f1c438257348e67f1ba Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Fri, 25 May 2012 01:31:23 +0000 Subject: [PATCH] HDFS-3058. HA: Bring BookKeeperJournalManager up to date with HA changes. Contributed by Ivan Kelly. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1342483 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/src/contrib/bkjournal/pom.xml | 11 + .../BookKeeperEditLogInputStream.java | 24 +- .../bkjournal/BookKeeperJournalManager.java | 25 +- .../hadoop/contrib/bkjournal/BKJMUtil.java | 182 ++++++++++++ .../TestBookKeeperAsHASharedDir.java | 267 ++++++++++++++++++ .../TestBookKeeperHACheckpoints.java | 93 ++++++ .../TestBookKeeperJournalManager.java | 244 ++++------------ .../server/namenode/FSEditLogTestUtil.java | 5 + .../hdfs/server/namenode/FSEditLog.java | 6 +- .../apache/hadoop/hdfs/MiniDFSCluster.java | 24 +- .../namenode/ha/TestStandbyCheckpoints.java | 6 +- 11 files changed, 677 insertions(+), 210 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml index 380ef62377..3f2c7cab53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml @@ -54,6 +54,12 @@ test-jar test + + org.apache.hadoop + hadoop-common + test-jar + test + org.apache.bookkeeper bookkeeper-server @@ -64,6 +70,11 @@ junit test + + org.mockito + mockito-all + test + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java index c8ec162479..b079c478ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java @@ -41,6 +41,7 @@ class BookKeeperEditLogInputStream extends EditLogInputStream { private final long firstTxId; private final long lastTxId; private final int logVersion; + private final boolean inProgress; private final LedgerHandle lh; private final FSEditLogOp.Reader reader; @@ -69,6 +70,7 @@ class BookKeeperEditLogInputStream extends EditLogInputStream { this.firstTxId = metadata.getFirstTxId(); this.lastTxId = metadata.getLastTxId(); this.logVersion = metadata.getVersion(); + this.inProgress = metadata.isInProgress(); BufferedInputStream bin = new BufferedInputStream( new LedgerInputStream(lh, firstBookKeeperEntry)); @@ -123,10 +125,28 @@ public String getName() { lh.toString(), firstTxId, lastTxId); } - // TODO(HA): Test this. @Override public boolean isInProgress() { - return true; + return inProgress; + } + + /** + * Skip forward to specified transaction id. + * Currently we do this by just iterating forward. + * If this proves to be too expensive, this can be reimplemented + * with a binary search over bk entries + */ + public void skipTo(long txId) throws IOException { + long numToSkip = getFirstTxId() - txId; + + FSEditLogOp op = null; + for (long i = 0; i < numToSkip; i++) { + op = readOp(); + } + if (op != null && op.getTransactionId() != txId-1) { + throw new IOException("Corrupt stream, expected txid " + + (txId-1) + ", got " + op.getTransactionId()); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java index 6a3bfbd651..1e89513327 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java @@ -233,11 +233,14 @@ public EditLogOutputStream startLogSegment(long txId) throws IOException { */ l.write(zkc, znodePath); + maxTxId.store(txId); return new BookKeeperEditLogOutputStream(conf, currentLedger, wl); } catch (Exception e) { if (currentLedger != null) { try { + long id = currentLedger.getId(); currentLedger.close(); + bkc.deleteLedger(id); } catch (Exception e2) { //log & ignore, an IOException will be thrown soon LOG.error("Error closing ledger", e2); @@ -384,8 +387,8 @@ long getNumberOfTransactions(long fromTxnId, boolean inProgressOk) break; } } - count += (l.getLastTxId() - l.getFirstTxId()) + 1; - expectedStart = l.getLastTxId() + 1; + count += (lastTxId - l.getFirstTxId()) + 1; + expectedStart = lastTxId + 1; } } return count; @@ -404,7 +407,7 @@ public void recoverUnfinalizedSegments() throws IOException { String znode = ledgerPath + "/" + child; EditLogLedgerMetadata l = EditLogLedgerMetadata.read(zkc, znode); - long endTxId = recoverLastTxId(l); + long endTxId = recoverLastTxId(l, true); if (endTxId == HdfsConstants.INVALID_TXID) { LOG.error("Unrecoverable corruption has occurred in segment " + l.toString() + " at path " + znode @@ -474,11 +477,19 @@ public void setOutputBufferCapacity(int size) { * Find the id of the last edit log transaction writen to a edit log * ledger. */ - private long recoverLastTxId(EditLogLedgerMetadata l) throws IOException { + private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence) + throws IOException { try { - LedgerHandle lh = bkc.openLedger(l.getLedgerId(), - BookKeeper.DigestType.MAC, - digestpw.getBytes()); + LedgerHandle lh = null; + if (fence) { + lh = bkc.openLedger(l.getLedgerId(), + BookKeeper.DigestType.MAC, + digestpw.getBytes()); + } else { + lh = bkc.openLedgerNoRecovery(l.getLedgerId(), + BookKeeper.DigestType.MAC, + digestpw.getBytes()); + } long lastAddConfirmed = lh.getLastAddConfirmed(); BookKeeperEditLogInputStream in = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java new file mode 100644 index 0000000000..32b0583c94 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.contrib.bkjournal; + +import static org.junit.Assert.*; + +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.KeeperException; + +import org.apache.bookkeeper.proto.BookieServer; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.util.LocalBookKeeper; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.List; + +import java.io.IOException; +import java.io.File; + +/** + * Utility class for setting up bookkeeper ensembles + * and bringing individual bookies up and down + */ +class BKJMUtil { + protected static final Log LOG = LogFactory.getLog(BKJMUtil.class); + + int nextPort = 6000; // next port for additionally created bookies + private Thread bkthread = null; + private final static String zkEnsemble = "127.0.0.1:2181"; + int numBookies; + + BKJMUtil(final int numBookies) throws Exception { + this.numBookies = numBookies; + + bkthread = new Thread() { + public void run() { + try { + String[] args = new String[1]; + args[0] = String.valueOf(numBookies); + LOG.info("Starting bk"); + LocalBookKeeper.main(args); + } catch (InterruptedException e) { + // go away quietly + } catch (Exception e) { + LOG.error("Error starting local bk", e); + } + } + }; + } + + void start() throws Exception { + bkthread.start(); + if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) { + throw new Exception("Error starting zookeeper/bookkeeper"); + } + assertEquals("Not all bookies started", + numBookies, checkBookiesUp(numBookies, 10)); + } + + void teardown() throws Exception { + if (bkthread != null) { + bkthread.interrupt(); + bkthread.join(); + } + } + + static ZooKeeper connectZooKeeper() + throws IOException, KeeperException, InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + + ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() { + public void process(WatchedEvent event) { + if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { + latch.countDown(); + } + } + }); + if (!latch.await(3, TimeUnit.SECONDS)) { + throw new IOException("Zookeeper took too long to connect"); + } + return zkc; + } + + static URI createJournalURI(String path) throws Exception { + return URI.create("bookkeeper://" + zkEnsemble + path); + } + + static void addJournalManagerDefinition(Configuration conf) { + conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + ".bookkeeper", + "org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager"); + } + + BookieServer newBookie() throws Exception { + int port = nextPort++; + ServerConfiguration bookieConf = new ServerConfiguration(); + bookieConf.setBookiePort(port); + File tmpdir = File.createTempFile("bookie" + Integer.toString(port) + "_", + "test"); + tmpdir.delete(); + tmpdir.mkdir(); + + bookieConf.setZkServers(zkEnsemble); + bookieConf.setJournalDirName(tmpdir.getPath()); + bookieConf.setLedgerDirNames(new String[] { tmpdir.getPath() }); + + BookieServer b = new BookieServer(bookieConf); + b.start(); + for (int i = 0; i < 10 && !b.isRunning(); i++) { + Thread.sleep(10000); + } + if (!b.isRunning()) { + throw new IOException("Bookie would not start"); + } + return b; + } + + /** + * Check that a number of bookies are available + * @param count number of bookies required + * @param timeout number of seconds to wait for bookies to start + * @throws IOException if bookies are not started by the time the timeout hits + */ + int checkBookiesUp(int count, int timeout) throws Exception { + ZooKeeper zkc = connectZooKeeper(); + try { + boolean up = false; + int mostRecentSize = 0; + for (int i = 0; i < timeout; i++) { + try { + List children = zkc.getChildren("/ledgers/available", + false); + mostRecentSize = children.size(); + if (LOG.isDebugEnabled()) { + LOG.debug("Found " + mostRecentSize + " bookies up, " + + "waiting for " + count); + if (LOG.isTraceEnabled()) { + for (String child : children) { + LOG.trace(" server: " + child); + } + } + } + if (mostRecentSize == count) { + up = true; + break; + } + } catch (KeeperException e) { + // ignore + } + Thread.sleep(1000); + } + return mostRecentSize; + } finally { + zkc.close(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java new file mode 100644 index 0000000000..c313cd1229 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.contrib.bkjournal; + +import static org.junit.Assert.*; +import org.junit.Test; +import org.junit.Before; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.AfterClass; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; + +import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.DFSTestUtil; + +import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil; + +import org.apache.hadoop.ipc.RemoteException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.bookkeeper.proto.BookieServer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; + +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.verify; + +/** + * Integration test to ensure that the BookKeeper JournalManager + * works for HDFS Namenode HA + */ +public class TestBookKeeperAsHASharedDir { + static final Log LOG = LogFactory.getLog(TestBookKeeperAsHASharedDir.class); + + private static BKJMUtil bkutil; + static int numBookies = 3; + + private static final String TEST_FILE_DATA = "HA BookKeeperJournalManager"; + + @BeforeClass + public static void setupBookkeeper() throws Exception { + bkutil = new BKJMUtil(numBookies); + bkutil.start(); + } + + @AfterClass + public static void teardownBookkeeper() throws Exception { + bkutil.teardown(); + } + + /** + * Test simple HA failover usecase with BK + */ + @Test + public void testFailoverWithBK() throws Exception { + Runtime mockRuntime1 = mock(Runtime.class); + Runtime mockRuntime2 = mock(Runtime.class); + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, + BKJMUtil.createJournalURI("/hotfailover").toString()); + BKJMUtil.addJournalManagerDefinition(conf); + + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(0) + .manageNameDfsSharedDirs(false) + .build(); + NameNode nn1 = cluster.getNameNode(0); + NameNode nn2 = cluster.getNameNode(1); + FSEditLogTestUtil.setRuntimeForEditLog(nn1, mockRuntime1); + FSEditLogTestUtil.setRuntimeForEditLog(nn2, mockRuntime2); + + cluster.waitActive(); + cluster.transitionToActive(0); + + Path p = new Path("/testBKJMfailover"); + + FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); + + fs.mkdirs(p); + cluster.shutdownNameNode(0); + + cluster.transitionToActive(1); + + assertTrue(fs.exists(p)); + } finally { + verify(mockRuntime1, times(0)).exit(anyInt()); + verify(mockRuntime2, times(0)).exit(anyInt()); + + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test HA failover, where BK, as the shared storage, fails. + * Once it becomes available again, a standby can come up. + * Verify that any write happening after the BK fail is not + * available on the standby. + */ + @Test + public void testFailoverWithFailingBKCluster() throws Exception { + int ensembleSize = numBookies + 1; + BookieServer newBookie = bkutil.newBookie(); + assertEquals("New bookie didn't start", + ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); + + BookieServer replacementBookie = null; + + Runtime mockRuntime1 = mock(Runtime.class); + Runtime mockRuntime2 = mock(Runtime.class); + + MiniDFSCluster cluster = null; + + try { + Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, + BKJMUtil.createJournalURI("/hotfailoverWithFail").toString()); + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE, + ensembleSize); + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, + ensembleSize); + BKJMUtil.addJournalManagerDefinition(conf); + + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(0) + .manageNameDfsSharedDirs(false) + .build(); + NameNode nn1 = cluster.getNameNode(0); + NameNode nn2 = cluster.getNameNode(1); + FSEditLogTestUtil.setRuntimeForEditLog(nn1, mockRuntime1); + FSEditLogTestUtil.setRuntimeForEditLog(nn2, mockRuntime2); + + cluster.waitActive(); + cluster.transitionToActive(0); + + Path p1 = new Path("/testBKJMFailingBKCluster1"); + Path p2 = new Path("/testBKJMFailingBKCluster2"); + + FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); + + fs.mkdirs(p1); + newBookie.shutdown(); // will take down shared storage + assertEquals("New bookie didn't stop", + numBookies, bkutil.checkBookiesUp(numBookies, 10)); + + // mkdirs will "succeed", but nn have called runtime.exit + fs.mkdirs(p2); + verify(mockRuntime1, atLeastOnce()).exit(anyInt()); + verify(mockRuntime2, times(0)).exit(anyInt()); + cluster.shutdownNameNode(0); + + try { + cluster.transitionToActive(1); + fail("Shouldn't have been able to transition with bookies down"); + } catch (ServiceFailedException e) { + assertTrue("Wrong exception", + e.getMessage().contains("Failed to start active services")); + } + verify(mockRuntime2, atLeastOnce()).exit(anyInt()); + + replacementBookie = bkutil.newBookie(); + assertEquals("Replacement bookie didn't start", + ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); + cluster.transitionToActive(1); // should work fine now + + assertTrue(fs.exists(p1)); + assertFalse(fs.exists(p2)); + } finally { + newBookie.shutdown(); + if (replacementBookie != null) { + replacementBookie.shutdown(); + } + + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test that two namenodes can't become primary at the same + * time. + */ + @Test + public void testMultiplePrimariesStarted() throws Exception { + Runtime mockRuntime1 = mock(Runtime.class); + Runtime mockRuntime2 = mock(Runtime.class); + Path p1 = new Path("/testBKJMMultiplePrimary"); + + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, + BKJMUtil.createJournalURI("/hotfailoverMultiple").toString()); + BKJMUtil.addJournalManagerDefinition(conf); + + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(0) + .manageNameDfsSharedDirs(false) + .build(); + NameNode nn1 = cluster.getNameNode(0); + NameNode nn2 = cluster.getNameNode(1); + FSEditLogTestUtil.setRuntimeForEditLog(nn1, mockRuntime1); + FSEditLogTestUtil.setRuntimeForEditLog(nn2, mockRuntime2); + cluster.waitActive(); + cluster.transitionToActive(0); + + FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); + fs.mkdirs(p1); + nn1.getRpcServer().rollEditLog(); + try { + cluster.transitionToActive(1); + fail("Shouldn't have been able to start two primaries" + + " with single shared storage"); + } catch (ServiceFailedException sfe) { + assertTrue("Wrong exception", + sfe.getMessage().contains("Failed to start active services")); + } + } finally { + verify(mockRuntime1, times(0)).exit(anyInt()); + verify(mockRuntime2, atLeastOnce()).exit(anyInt()); + + if (cluster != null) { + cluster.shutdown(); + } + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java new file mode 100644 index 0000000000..cb7ba3f324 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.contrib.bkjournal; + +import org.apache.hadoop.hdfs.server.namenode.ha.TestStandbyCheckpoints; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; + +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; + +import org.junit.Before; +import org.junit.After; + +import org.junit.BeforeClass; +import org.junit.AfterClass; + +/** + * Runs the same tests as TestStandbyCheckpoints, but + * using a bookkeeper journal manager as the shared directory + */ +public class TestBookKeeperHACheckpoints extends TestStandbyCheckpoints { + private static BKJMUtil bkutil = null; + static int numBookies = 3; + static int journalCount = 0; + + @Override + @Before + public void setupCluster() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, + BKJMUtil.createJournalURI("/checkpointing" + journalCount++) + .toString()); + BKJMUtil.addJournalManagerDefinition(conf); + + MiniDFSNNTopology topology = new MiniDFSNNTopology() + .addNameservice(new MiniDFSNNTopology.NSConf("ns1") + .addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001)) + .addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002))); + + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(topology) + .numDataNodes(0) + .manageNameDfsSharedDirs(false) + .build(); + cluster.waitActive(); + + nn0 = cluster.getNameNode(0); + nn1 = cluster.getNameNode(1); + fs = HATestUtil.configureFailoverFs(cluster, conf); + + cluster.transitionToActive(0); + } + + @BeforeClass + public static void startBK() throws Exception { + journalCount = 0; + bkutil = new BKJMUtil(numBookies); + bkutil.start(); + } + + @AfterClass + public static void shutdownBK() throws Exception { + if (bkutil != null) { + bkutil.teardown(); + } + } + + @Override + public void testCheckpointCancellation() throws Exception { + // Overriden as the implementation in the superclass assumes that writes + // are to a file. This should be fixed at some point + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java index 7dbc95f25d..ddd2c468be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java @@ -18,55 +18,23 @@ package org.apache.hadoop.contrib.bkjournal; import static org.junit.Assert.*; - -import java.net.URI; -import java.util.Collections; -import java.util.Arrays; -import java.util.List; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.bookkeeper.proto.BookieServer; -import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.util.LocalBookKeeper; - -import java.io.RandomAccessFile; -import java.io.File; -import java.io.FilenameFilter; -import java.io.BufferedInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.security.SecurityUtil; import org.junit.Test; import org.junit.Before; import org.junit.After; import org.junit.BeforeClass; import org.junit.AfterClass; -import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; -import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes; import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil; import org.apache.hadoop.hdfs.server.namenode.JournalManager; +import org.apache.bookkeeper.proto.BookieServer; import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.KeeperException; - -import com.google.common.collect.ImmutableList; - -import java.util.zip.CheckedInputStream; -import java.util.zip.Checksum; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -75,125 +43,26 @@ public class TestBookKeeperJournalManager { static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class); private static final long DEFAULT_SEGMENT_SIZE = 1000; - private static final String zkEnsemble = "localhost:2181"; - final static private int numBookies = 5; - private static Thread bkthread; protected static Configuration conf = new Configuration(); private ZooKeeper zkc; - - - static int nextPort = 6000; // next port for additionally created bookies - - private static ZooKeeper connectZooKeeper(String ensemble) - throws IOException, KeeperException, InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - - ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() { - public void process(WatchedEvent event) { - if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { - latch.countDown(); - } - } - }); - if (!latch.await(3, TimeUnit.SECONDS)) { - throw new IOException("Zookeeper took too long to connect"); - } - return zkc; - } - - private static BookieServer newBookie() throws Exception { - int port = nextPort++; - ServerConfiguration bookieConf = new ServerConfiguration(); - bookieConf.setBookiePort(port); - File tmpdir = File.createTempFile("bookie" + Integer.toString(port) + "_", - "test"); - tmpdir.delete(); - tmpdir.mkdir(); - - bookieConf.setZkServers(zkEnsemble); - bookieConf.setJournalDirName(tmpdir.getPath()); - bookieConf.setLedgerDirNames(new String[] { tmpdir.getPath() }); - - BookieServer b = new BookieServer(bookieConf); - b.start(); - for (int i = 0; i < 10 && !b.isRunning(); i++) { - Thread.sleep(10000); - } - if (!b.isRunning()) { - throw new IOException("Bookie would not start"); - } - return b; - } - - /** - * Check that a number of bookies are available - * @param count number of bookies required - * @param timeout number of seconds to wait for bookies to start - * @throws IOException if bookies are not started by the time the timeout hits - */ - private static int checkBookiesUp(int count, int timeout) throws Exception { - ZooKeeper zkc = connectZooKeeper(zkEnsemble); - try { - boolean up = false; - int mostRecentSize = 0; - for (int i = 0; i < timeout; i++) { - try { - List children = zkc.getChildren("/ledgers/available", - false); - mostRecentSize = children.size(); - if (LOG.isDebugEnabled()) { - LOG.debug("Found " + mostRecentSize + " bookies up, " - + "waiting for " + count); - if (LOG.isTraceEnabled()) { - for (String child : children) { - LOG.trace(" server: " + child); - } - } - } - if (mostRecentSize == count) { - up = true; - break; - } - } catch (KeeperException e) { - // ignore - } - Thread.sleep(1000); - } - return mostRecentSize; - } finally { - zkc.close(); - } - } + private static BKJMUtil bkutil; + static int numBookies = 3; @BeforeClass public static void setupBookkeeper() throws Exception { - bkthread = new Thread() { - public void run() { - try { - String[] args = new String[1]; - args[0] = String.valueOf(numBookies); - LOG.info("Starting bk"); - LocalBookKeeper.main(args); - } catch (InterruptedException e) { - // go away quietly - } catch (Exception e) { - LOG.error("Error starting local bk", e); - } - } - }; - bkthread.start(); - - if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) { - throw new Exception("Error starting zookeeper/bookkeeper"); - } - assertEquals("Not all bookies started", - numBookies, checkBookiesUp(numBookies, 10)); + bkutil = new BKJMUtil(numBookies); + bkutil.start(); } - + + @AfterClass + public static void teardownBookkeeper() throws Exception { + bkutil.teardown(); + } + @Before public void setup() throws Exception { - zkc = connectZooKeeper(zkEnsemble); + zkc = BKJMUtil.connectZooKeeper(); } @After @@ -201,18 +70,10 @@ public void teardown() throws Exception { zkc.close(); } - @AfterClass - public static void teardownBookkeeper() throws Exception { - if (bkthread != null) { - bkthread.interrupt(); - bkthread.join(); - } - } - @Test public void testSimpleWrite() throws Exception { BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplewrite")); + BKJMUtil.createJournalURI("/hdfsjournal-simplewrite")); long txid = 1; EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= 100; i++) { @@ -231,8 +92,8 @@ public void testSimpleWrite() throws Exception { @Test public void testNumberOfTransactions() throws Exception { - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-txncount")); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-txncount")); long txid = 1; EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= 100; i++) { @@ -249,8 +110,8 @@ public void testNumberOfTransactions() throws Exception { @Test public void testNumberOfTransactionsWithGaps() throws Exception { - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-gaps")); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-gaps")); long txid = 1; for (long i = 0; i < 3; i++) { long start = txid; @@ -262,9 +123,11 @@ public void testNumberOfTransactionsWithGaps() throws Exception { } out.close(); bkjm.finalizeLogSegment(start, txid-1); - assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false)); + assertNotNull( + zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false)); } - zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1); + zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, + DEFAULT_SEGMENT_SIZE*2), -1); long numTrans = bkjm.getNumberOfTransactions(1, true); assertEquals(DEFAULT_SEGMENT_SIZE, numTrans); @@ -282,8 +145,8 @@ public void testNumberOfTransactionsWithGaps() throws Exception { @Test public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception { - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-inprogressAtEnd")); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd")); long txid = 1; for (long i = 0; i < 3; i++) { long start = txid; @@ -296,7 +159,8 @@ public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception { out.close(); bkjm.finalizeLogSegment(start, (txid-1)); - assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false)); + assertNotNull( + zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false)); } long start = txid; EditLogOutputStream out = bkjm.startLogSegment(start); @@ -320,8 +184,8 @@ public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception { */ @Test public void testWriteRestartFrom1() throws Exception { - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-restartFrom1")); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1")); long txid = 1; long start = txid; EditLogOutputStream out = bkjm.startLogSegment(txid); @@ -375,10 +239,10 @@ public void testWriteRestartFrom1() throws Exception { @Test public void testTwoWriters() throws Exception { long start = 1; - BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter")); - BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter")); + BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-dualWriter")); + BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-dualWriter")); EditLogOutputStream out1 = bkjm1.startLogSegment(start); try { @@ -391,8 +255,8 @@ public void testTwoWriters() throws Exception { @Test public void testSimpleRead() throws Exception { - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simpleread")); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-simpleread")); long txid = 1; final long numTransactions = 10000; EditLogOutputStream out = bkjm.startLogSegment(1); @@ -416,8 +280,8 @@ public void testSimpleRead() throws Exception { @Test public void testSimpleRecovery() throws Exception { - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplerecovery")); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery")); EditLogOutputStream out = bkjm.startLogSegment(1); long txid = 1; for (long i = 1 ; i <= 100; i++) { @@ -448,13 +312,13 @@ public void testSimpleRecovery() throws Exception { */ @Test public void testAllBookieFailure() throws Exception { - BookieServer bookieToFail = newBookie(); + BookieServer bookieToFail = bkutil.newBookie(); BookieServer replacementBookie = null; try { int ensembleSize = numBookies + 1; assertEquals("New bookie didn't start", - ensembleSize, checkBookiesUp(ensembleSize, 10)); + ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); // ensure that the journal manager has to use all bookies, // so that a failure will fail the journal manager @@ -465,8 +329,7 @@ public void testAllBookieFailure() throws Exception { ensembleSize); long txid = 1; BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble - + "/hdfsjournal-allbookiefailure")); + BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure")); EditLogOutputStream out = bkjm.startLogSegment(txid); for (long i = 1 ; i <= 3; i++) { @@ -478,7 +341,7 @@ public void testAllBookieFailure() throws Exception { out.flush(); bookieToFail.shutdown(); assertEquals("New bookie didn't die", - numBookies, checkBookiesUp(numBookies, 10)); + numBookies, bkutil.checkBookiesUp(numBookies, 10)); try { for (long i = 1 ; i <= 3; i++) { @@ -494,10 +357,10 @@ public void testAllBookieFailure() throws Exception { assertTrue("Invalid exception message", ioe.getMessage().contains("Failed to write to bookkeeper")); } - replacementBookie = newBookie(); + replacementBookie = bkutil.newBookie(); assertEquals("New bookie didn't start", - numBookies+1, checkBookiesUp(numBookies+1, 10)); + numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10)); out = bkjm.startLogSegment(txid); for (long i = 1 ; i <= 3; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); @@ -517,7 +380,7 @@ public void testAllBookieFailure() throws Exception { } bookieToFail.shutdown(); - if (checkBookiesUp(numBookies, 30) != numBookies) { + if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) { LOG.warn("Not all bookies from this test shut down, expect errors"); } } @@ -530,13 +393,13 @@ public void testAllBookieFailure() throws Exception { */ @Test public void testOneBookieFailure() throws Exception { - BookieServer bookieToFail = newBookie(); + BookieServer bookieToFail = bkutil.newBookie(); BookieServer replacementBookie = null; try { int ensembleSize = numBookies + 1; assertEquals("New bookie didn't start", - ensembleSize, checkBookiesUp(ensembleSize, 10)); + ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); // ensure that the journal manager has to use all bookies, // so that a failure will fail the journal manager @@ -547,8 +410,7 @@ public void testOneBookieFailure() throws Exception { ensembleSize); long txid = 1; BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble - + "/hdfsjournal-onebookiefailure")); + BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure")); EditLogOutputStream out = bkjm.startLogSegment(txid); for (long i = 1 ; i <= 3; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); @@ -558,12 +420,12 @@ public void testOneBookieFailure() throws Exception { out.setReadyToFlush(); out.flush(); - replacementBookie = newBookie(); + replacementBookie = bkutil.newBookie(); assertEquals("replacement bookie didn't start", - ensembleSize+1, checkBookiesUp(ensembleSize+1, 10)); + ensembleSize+1, bkutil.checkBookiesUp(ensembleSize+1, 10)); bookieToFail.shutdown(); assertEquals("New bookie didn't die", - ensembleSize, checkBookiesUp(ensembleSize, 10)); + ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); for (long i = 1 ; i <= 3; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); @@ -581,10 +443,10 @@ public void testOneBookieFailure() throws Exception { } bookieToFail.shutdown(); - if (checkBookiesUp(numBookies, 30) != numBookies) { + if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) { LOG.warn("Not all bookies from this test shut down, expect errors"); } } } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java index a46f9cf0ed..0889bcdf72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java @@ -36,4 +36,9 @@ public static long countTransactionsInStream(EditLogInputStream in) FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in); return (validation.getEndTxId() - in.getFirstTxId()) + 1; } + + public static void setRuntimeForEditLog(NameNode nn, Runtime rt) { + nn.setRuntimeForTesting(rt); + nn.getFSImage().getEditLog().setRuntimeForTesting(rt); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index 31ce117575..ff38c9594d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -231,6 +231,10 @@ private synchronized void initJournals(List dirs) { DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT); journalSet = new JournalSet(minimumRedundantJournals); + // set runtime so we can test starting with a faulty or unavailable + // shared directory + this.journalSet.setRuntimeForTesting(runtime); + for (URI u : dirs) { boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf) .contains(u); @@ -842,7 +846,7 @@ synchronized public JournalSet getJournalSet() { * Used only by unit tests. */ @VisibleForTesting - synchronized void setRuntimeForTesting(Runtime runtime) { + synchronized public void setRuntimeForTesting(Runtime runtime) { this.runtime = runtime; this.journalSet.setRuntimeForTesting(runtime); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index d18553d2f7..d57f792472 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -133,6 +133,7 @@ public static class Builder { private int numDataNodes = 1; private boolean format = true; private boolean manageNameDfsDirs = true; + private boolean manageNameDfsSharedDirs = true; private boolean manageDataDfsDirs = true; private StartupOption option = null; private String[] racks = null; @@ -187,6 +188,14 @@ public Builder manageNameDfsDirs(boolean val) { return this; } + /** + * Default: true + */ + public Builder manageNameDfsSharedDirs(boolean val) { + this.manageNameDfsSharedDirs = val; + return this; + } + /** * Default: true */ @@ -288,6 +297,7 @@ private MiniDFSCluster(Builder builder) throws IOException { builder.numDataNodes, builder.format, builder.manageNameDfsDirs, + builder.manageNameDfsSharedDirs, builder.manageDataDfsDirs, builder.option, builder.racks, @@ -527,7 +537,7 @@ public MiniDFSCluster(int nameNodePort, long[] simulatedCapacities) throws IOException { this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster initMiniDFSCluster(conf, numDataNodes, format, - manageNameDfsDirs, manageDataDfsDirs, operation, racks, hosts, + manageNameDfsDirs, true, manageDataDfsDirs, operation, racks, hosts, simulatedCapacities, null, true, false, MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0)); } @@ -535,7 +545,8 @@ public MiniDFSCluster(int nameNodePort, private void initMiniDFSCluster( Configuration conf, int numDataNodes, boolean format, boolean manageNameDfsDirs, - boolean manageDataDfsDirs, StartupOption operation, String[] racks, + boolean manageNameDfsSharedDirs, boolean manageDataDfsDirs, + StartupOption operation, String[] racks, String[] hosts, long[] simulatedCapacities, String clusterId, boolean waitSafeMode, boolean setupHostsFile, MiniDFSNNTopology nnTopology) @@ -574,7 +585,8 @@ private void initMiniDFSCluster( federation = nnTopology.isFederated(); createNameNodesAndSetConf( - nnTopology, manageNameDfsDirs, format, operation, clusterId, conf); + nnTopology, manageNameDfsDirs, manageNameDfsSharedDirs, + format, operation, clusterId, conf); if (format) { if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) { @@ -595,8 +607,8 @@ private void initMiniDFSCluster( } private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology, - boolean manageNameDfsDirs, boolean format, StartupOption operation, - String clusterId, + boolean manageNameDfsDirs, boolean manageNameDfsSharedDirs, + boolean format, StartupOption operation, String clusterId, Configuration conf) throws IOException { Preconditions.checkArgument(nnTopology.countNameNodes() > 0, "empty NN topology: no namenodes specified!"); @@ -641,7 +653,7 @@ private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology, if (nnIds.size() > 1) { conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, nameservice.getId()), Joiner.on(",").join(nnIds)); - if (manageNameDfsDirs) { + if (manageNameDfsSharedDirs) { URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter+nnIds.size()-1); conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java index 64108d0b25..3fa89105a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java @@ -54,9 +54,9 @@ public class TestStandbyCheckpoints { private static final int NUM_DIRS_IN_LOG = 200000; - private MiniDFSCluster cluster; - private NameNode nn0, nn1; - private FileSystem fs; + protected MiniDFSCluster cluster; + protected NameNode nn0, nn1; + protected FileSystem fs; @SuppressWarnings("rawtypes") @Before