diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ExitUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ExitUtil.java index a4af34666e..aad7ca03be 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ExitUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ExitUtil.java @@ -64,6 +64,15 @@ public static ExitException getFirstExitException() { return firstExitException; } + /** + * Reset the tracking of process termination. This is for use + * in unit tests where one test in the suite expects an exit + * but others do not. + */ + public static void resetFirstExitException() { + firstExitException = null; + } + /** * Terminate the current process. Note that terminate is the *only* method * that should be used to terminate the daemon processes. @@ -103,4 +112,4 @@ public static void terminate(int status, Throwable t) throws ExitException { public static void terminate(int status) throws ExitException { terminate(status, "ExitException"); } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 30e08c6eba..5cd7f66841 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -383,6 +383,9 @@ Branch-2 ( Unreleased changes ) HDFS-3276. initializeSharedEdits should have a -nonInteractive flag (todd) + HDFS-3765. namenode -initializeSharedEdits should be able to initialize + all shared storages. (Vinay and todd via todd) + OPTIMIZATIONS HDFS-2982. Startup performance suffers when there are many edit log diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java index d1a80f637a..ebbf80aa37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java @@ -18,6 +18,7 @@ package org.apache.hadoop.contrib.bkjournal; import static org.junit.Assert.*; + import org.junit.Test; import org.junit.Before; import org.junit.After; @@ -25,6 +26,9 @@ import org.junit.AfterClass; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.ha.HAServiceProtocol.RequestSource; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HAUtil; @@ -35,12 +39,16 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil.ExitException; import org.apache.bookkeeper.proto.BookieServer; @@ -48,7 +56,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import java.io.File; import java.io.IOException; +import java.net.URISyntaxException; /** * Integration test to ensure that the BookKeeper JournalManager @@ -67,6 +77,11 @@ public static void setupBookkeeper() throws Exception { bkutil = new BKJMUtil(numBookies); bkutil.start(); } + + @Before + public void clearExitStatus() { + ExitUtil.resetFirstExitException(); + } @AfterClass public static void teardownBookkeeper() throws Exception { @@ -244,4 +259,97 @@ public void testMultiplePrimariesStarted() throws Exception { } } } + + /** + * Use NameNode INTIALIZESHAREDEDITS to initialize the shared edits. i.e. copy + * the edits log segments to new bkjm shared edits. + * + * @throws Exception + */ + @Test + public void testInitializeBKSharedEdits() throws Exception { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + HAUtil.setAllowStandbyReads(conf, true); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + + MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology(); + cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology) + .numDataNodes(0).build(); + cluster.waitActive(); + // Shutdown and clear the current filebased shared dir. + cluster.shutdownNameNodes(); + File shareddir = new File(cluster.getSharedEditsDir(0, 1)); + assertTrue("Initial Shared edits dir not fully deleted", + FileUtil.fullyDelete(shareddir)); + + // Check namenodes should not start without shared dir. + assertCanNotStartNamenode(cluster, 0); + assertCanNotStartNamenode(cluster, 1); + + // Configure bkjm as new shared edits dir in both namenodes + Configuration nn1Conf = cluster.getConfiguration(0); + Configuration nn2Conf = cluster.getConfiguration(1); + nn1Conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil + .createJournalURI("/initializeSharedEdits").toString()); + nn2Conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil + .createJournalURI("/initializeSharedEdits").toString()); + BKJMUtil.addJournalManagerDefinition(nn1Conf); + BKJMUtil.addJournalManagerDefinition(nn2Conf); + + // Initialize the BKJM shared edits. + assertFalse(NameNode.initializeSharedEdits(nn1Conf)); + + // NameNode should be able to start and should be in sync with BKJM as + // shared dir + assertCanStartHANameNodes(cluster, conf, "/testBKJMInitialize"); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private void assertCanNotStartNamenode(MiniDFSCluster cluster, int nnIndex) { + try { + cluster.restartNameNode(nnIndex, false); + fail("Should not have been able to start NN" + (nnIndex) + + " without shared dir"); + } catch (IOException ioe) { + LOG.info("Got expected exception", ioe); + GenericTestUtils.assertExceptionContains( + "Cannot start an HA namenode with name dirs that need recovery", ioe); + } + } + + private void assertCanStartHANameNodes(MiniDFSCluster cluster, + Configuration conf, String path) throws ServiceFailedException, + IOException, URISyntaxException, InterruptedException { + // Now should be able to start both NNs. Pass "false" here so that we don't + // try to waitActive on all NNs, since the second NN doesn't exist yet. + cluster.restartNameNode(0, false); + cluster.restartNameNode(1, true); + + // Make sure HA is working. + cluster + .getNameNode(0) + .getRpcServer() + .transitionToActive( + new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER)); + FileSystem fs = null; + try { + Path newPath = new Path(path); + fs = HATestUtil.configureFailoverFs(cluster, conf); + assertTrue(fs.mkdirs(newPath)); + HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0), + cluster.getNameNode(1)); + assertTrue(NameNodeAdapter.getFileInfo(cluster.getNameNode(1), + newPath.toString(), false).isDir()); + } finally { + if (fs != null) { + fs.close(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 38c8415165..20e7aafba9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -17,18 +17,12 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Iterator; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,10 +36,10 @@ import org.apache.hadoop.ha.ServiceFailedException; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Trash; import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -53,9 +47,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; -import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; -import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; -import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream; import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState; import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; @@ -68,8 +59,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; -import org.apache.hadoop.hdfs.util.AtomicFileOutputStream; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -88,6 +77,7 @@ import static org.apache.hadoop.util.ToolRunner.confirmPrompt; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; /********************************************************** @@ -767,9 +757,18 @@ private static boolean initializeSharedEdits(Configuration conf, String nsId = DFSUtil.getNamenodeNameServiceId(conf); String namenodeId = HAUtil.getNameNodeId(conf, nsId); initializeGenericKeys(conf, nsId, namenodeId); + + if (conf.get(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY) == null) { + LOG.fatal("No shared edits directory configured for namespace " + + nsId + " namenode " + namenodeId); + return false; + } + NNStorage existingStorage = null; try { - FSNamesystem fsns = FSNamesystem.loadFromDisk(conf, + Configuration confWithoutShared = new Configuration(conf); + confWithoutShared.unset(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY); + FSNamesystem fsns = FSNamesystem.loadFromDisk(confWithoutShared, FSNamesystem.getNamespaceDirs(conf), FSNamesystem.getNamespaceEditsDirs(conf, false)); @@ -799,11 +798,9 @@ private static boolean initializeSharedEdits(Configuration conf, fsns.getFSImage().getEditLog().close(); fsns.getFSImage().getEditLog().initJournalsForWrite(); fsns.getFSImage().getEditLog().recoverUnclosedStreams(); - - if (copyEditLogSegmentsToSharedDir(fsns, sharedEditsDirs, - newSharedStorage, conf)) { - return true; // aborted - } + + copyEditLogSegmentsToSharedDir(fsns, sharedEditsDirs, newSharedStorage, + conf); } catch (IOException ioe) { LOG.error("Could not initialize shared edits dir", ioe); return true; // aborted @@ -821,43 +818,59 @@ private static boolean initializeSharedEdits(Configuration conf, } return false; // did not abort } - - private static boolean copyEditLogSegmentsToSharedDir(FSNamesystem fsns, + + private static void copyEditLogSegmentsToSharedDir(FSNamesystem fsns, Collection sharedEditsDirs, NNStorage newSharedStorage, - Configuration conf) throws FileNotFoundException, IOException { + Configuration conf) throws IOException { + Preconditions.checkArgument(!sharedEditsDirs.isEmpty(), + "No shared edits specified"); // Copy edit log segments into the new shared edits dir. - for (JournalAndStream jas : fsns.getFSImage().getEditLog().getJournals()) { - FileJournalManager fjm = null; - if (!(jas.getManager() instanceof FileJournalManager)) { - LOG.error("Cannot populate shared edits dir from non-file " + - "journal manager: " + jas.getManager()); - return true; // aborted - } else { - fjm = (FileJournalManager) jas.getManager(); - } - for (EditLogFile elf : fjm.getLogFiles(fsns.getFSImage() - .getMostRecentCheckpointTxId())) { - File editLogSegment = elf.getFile(); - for (URI sharedEditsUri : sharedEditsDirs) { - StorageDirectory sharedEditsDir = newSharedStorage - .getStorageDirectory(sharedEditsUri); - File targetFile = new File(sharedEditsDir.getCurrentDir(), - editLogSegment.getName()); - if (!targetFile.exists()) { - InputStream in = null; - OutputStream out = null; - try { - in = new FileInputStream(editLogSegment); - out = new AtomicFileOutputStream(targetFile); - IOUtils.copyBytes(in, out, conf); - } finally { - IOUtils.cleanup(LOG, in, out); - } - } + List sharedEditsUris = new ArrayList(sharedEditsDirs); + FSEditLog newSharedEditLog = new FSEditLog(conf, newSharedStorage, + sharedEditsUris); + newSharedEditLog.initJournalsForWrite(); + newSharedEditLog.recoverUnclosedStreams(); + + FSEditLog sourceEditLog = fsns.getFSImage().editLog; + + long fromTxId = fsns.getFSImage().getMostRecentCheckpointTxId(); + Collection streams = sourceEditLog.selectInputStreams( + fromTxId+1, 0); + + // Set the nextTxid to the CheckpointTxId+1 + newSharedEditLog.setNextTxId(fromTxId + 1); + + // Copy all edits after last CheckpointTxId to shared edits dir + for (EditLogInputStream stream : streams) { + LOG.debug("Beginning to copy stream " + stream + " to shared edits"); + FSEditLogOp op; + boolean segmentOpen = false; + while ((op = stream.readOp()) != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("copying op: " + op); + } + if (!segmentOpen) { + newSharedEditLog.startLogSegment(op.txid, false); + segmentOpen = true; + } + + newSharedEditLog.logEdit(op); + + if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) { + newSharedEditLog.logSync(); + newSharedEditLog.endCurrentLogSegment(false); + LOG.debug("ending log segment because of END_LOG_SEGMENT op in " + stream); + segmentOpen = false; } } + + if (segmentOpen) { + LOG.debug("ending log segment because of end of stream in " + stream); + newSharedEditLog.logSync(); + newSharedEditLog.endCurrentLogSegment(false); + segmentOpen = false; + } } - return false; // did not abort } private static boolean finalize(Configuration conf, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index 40d7e0861a..9b87eb7497 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -65,7 +65,7 @@ public abstract class HATestUtil { * @throws CouldNotCatchUpException if the standby doesn't catch up to the * active in NN_LAG_TIMEOUT milliseconds */ - static void waitForStandbyToCatchUp(NameNode active, + public static void waitForStandbyToCatchUp(NameNode active, NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException { long activeTxId = active.getNamesystem().getFSImage().getEditLog() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java index 72110b29a8..47182d2798 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java @@ -158,6 +158,13 @@ public void testInitializeSharedEdits() throws Exception { assertCanStartHaNameNodes("2"); } + @Test + public void testFailWhenNoSharedEditsSpecified() throws Exception { + Configuration confNoShared = new Configuration(conf); + confNoShared.unset(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY); + assertFalse(NameNode.initializeSharedEdits(confNoShared, true)); + } + @Test public void testDontOverWriteExistingDir() { assertFalse(NameNode.initializeSharedEdits(conf, false));