HDFS-3765. namenode -initializeSharedEdits should be able to initialize all shared storages. Contributed by Vinay and Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1373061 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0c27c77080
commit
b38bd555e8
@ -64,6 +64,15 @@ public static ExitException getFirstExitException() {
|
||||
return firstExitException;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the tracking of process termination. This is for use
|
||||
* in unit tests where one test in the suite expects an exit
|
||||
* but others do not.
|
||||
*/
|
||||
public static void resetFirstExitException() {
|
||||
firstExitException = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Terminate the current process. Note that terminate is the *only* method
|
||||
* that should be used to terminate the daemon processes.
|
||||
@ -103,4 +112,4 @@ public static void terminate(int status, Throwable t) throws ExitException {
|
||||
public static void terminate(int status) throws ExitException {
|
||||
terminate(status, "ExitException");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -383,6 +383,9 @@ Branch-2 ( Unreleased changes )
|
||||
|
||||
HDFS-3276. initializeSharedEdits should have a -nonInteractive flag (todd)
|
||||
|
||||
HDFS-3765. namenode -initializeSharedEdits should be able to initialize
|
||||
all shared storages. (Vinay and todd via todd)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-2982. Startup performance suffers when there are many edit log
|
||||
|
@ -18,6 +18,7 @@
|
||||
package org.apache.hadoop.contrib.bkjournal;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.Before;
|
||||
import org.junit.After;
|
||||
@ -25,6 +26,9 @@
|
||||
import org.junit.AfterClass;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.ServiceFailedException;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
@ -35,12 +39,16 @@
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
import org.apache.hadoop.util.ExitUtil.ExitException;
|
||||
|
||||
import org.apache.bookkeeper.proto.BookieServer;
|
||||
@ -48,7 +56,9 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
/**
|
||||
* Integration test to ensure that the BookKeeper JournalManager
|
||||
@ -67,6 +77,11 @@ public static void setupBookkeeper() throws Exception {
|
||||
bkutil = new BKJMUtil(numBookies);
|
||||
bkutil.start();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void clearExitStatus() {
|
||||
ExitUtil.resetFirstExitException();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownBookkeeper() throws Exception {
|
||||
@ -244,4 +259,97 @@ public void testMultiplePrimariesStarted() throws Exception {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Use NameNode INTIALIZESHAREDEDITS to initialize the shared edits. i.e. copy
|
||||
* the edits log segments to new bkjm shared edits.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testInitializeBKSharedEdits() throws Exception {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
HAUtil.setAllowStandbyReads(conf, true);
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||
|
||||
MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology();
|
||||
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
|
||||
.numDataNodes(0).build();
|
||||
cluster.waitActive();
|
||||
// Shutdown and clear the current filebased shared dir.
|
||||
cluster.shutdownNameNodes();
|
||||
File shareddir = new File(cluster.getSharedEditsDir(0, 1));
|
||||
assertTrue("Initial Shared edits dir not fully deleted",
|
||||
FileUtil.fullyDelete(shareddir));
|
||||
|
||||
// Check namenodes should not start without shared dir.
|
||||
assertCanNotStartNamenode(cluster, 0);
|
||||
assertCanNotStartNamenode(cluster, 1);
|
||||
|
||||
// Configure bkjm as new shared edits dir in both namenodes
|
||||
Configuration nn1Conf = cluster.getConfiguration(0);
|
||||
Configuration nn2Conf = cluster.getConfiguration(1);
|
||||
nn1Conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
|
||||
.createJournalURI("/initializeSharedEdits").toString());
|
||||
nn2Conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
|
||||
.createJournalURI("/initializeSharedEdits").toString());
|
||||
BKJMUtil.addJournalManagerDefinition(nn1Conf);
|
||||
BKJMUtil.addJournalManagerDefinition(nn2Conf);
|
||||
|
||||
// Initialize the BKJM shared edits.
|
||||
assertFalse(NameNode.initializeSharedEdits(nn1Conf));
|
||||
|
||||
// NameNode should be able to start and should be in sync with BKJM as
|
||||
// shared dir
|
||||
assertCanStartHANameNodes(cluster, conf, "/testBKJMInitialize");
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void assertCanNotStartNamenode(MiniDFSCluster cluster, int nnIndex) {
|
||||
try {
|
||||
cluster.restartNameNode(nnIndex, false);
|
||||
fail("Should not have been able to start NN" + (nnIndex)
|
||||
+ " without shared dir");
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Got expected exception", ioe);
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"Cannot start an HA namenode with name dirs that need recovery", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
private void assertCanStartHANameNodes(MiniDFSCluster cluster,
|
||||
Configuration conf, String path) throws ServiceFailedException,
|
||||
IOException, URISyntaxException, InterruptedException {
|
||||
// Now should be able to start both NNs. Pass "false" here so that we don't
|
||||
// try to waitActive on all NNs, since the second NN doesn't exist yet.
|
||||
cluster.restartNameNode(0, false);
|
||||
cluster.restartNameNode(1, true);
|
||||
|
||||
// Make sure HA is working.
|
||||
cluster
|
||||
.getNameNode(0)
|
||||
.getRpcServer()
|
||||
.transitionToActive(
|
||||
new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER));
|
||||
FileSystem fs = null;
|
||||
try {
|
||||
Path newPath = new Path(path);
|
||||
fs = HATestUtil.configureFailoverFs(cluster, conf);
|
||||
assertTrue(fs.mkdirs(newPath));
|
||||
HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
|
||||
cluster.getNameNode(1));
|
||||
assertTrue(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
|
||||
newPath.toString(), false).isDir());
|
||||
} finally {
|
||||
if (fs != null) {
|
||||
fs.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,18 +17,12 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -42,10 +36,10 @@
|
||||
import org.apache.hadoop.ha.ServiceFailedException;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Trash;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
@ -53,9 +47,6 @@
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
|
||||
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
||||
@ -68,8 +59,6 @@
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
@ -88,6 +77,7 @@
|
||||
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**********************************************************
|
||||
@ -767,9 +757,18 @@ private static boolean initializeSharedEdits(Configuration conf,
|
||||
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
|
||||
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
|
||||
initializeGenericKeys(conf, nsId, namenodeId);
|
||||
|
||||
if (conf.get(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY) == null) {
|
||||
LOG.fatal("No shared edits directory configured for namespace " +
|
||||
nsId + " namenode " + namenodeId);
|
||||
return false;
|
||||
}
|
||||
|
||||
NNStorage existingStorage = null;
|
||||
try {
|
||||
FSNamesystem fsns = FSNamesystem.loadFromDisk(conf,
|
||||
Configuration confWithoutShared = new Configuration(conf);
|
||||
confWithoutShared.unset(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
|
||||
FSNamesystem fsns = FSNamesystem.loadFromDisk(confWithoutShared,
|
||||
FSNamesystem.getNamespaceDirs(conf),
|
||||
FSNamesystem.getNamespaceEditsDirs(conf, false));
|
||||
|
||||
@ -799,11 +798,9 @@ private static boolean initializeSharedEdits(Configuration conf,
|
||||
fsns.getFSImage().getEditLog().close();
|
||||
fsns.getFSImage().getEditLog().initJournalsForWrite();
|
||||
fsns.getFSImage().getEditLog().recoverUnclosedStreams();
|
||||
|
||||
if (copyEditLogSegmentsToSharedDir(fsns, sharedEditsDirs,
|
||||
newSharedStorage, conf)) {
|
||||
return true; // aborted
|
||||
}
|
||||
|
||||
copyEditLogSegmentsToSharedDir(fsns, sharedEditsDirs, newSharedStorage,
|
||||
conf);
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Could not initialize shared edits dir", ioe);
|
||||
return true; // aborted
|
||||
@ -821,43 +818,59 @@ private static boolean initializeSharedEdits(Configuration conf,
|
||||
}
|
||||
return false; // did not abort
|
||||
}
|
||||
|
||||
private static boolean copyEditLogSegmentsToSharedDir(FSNamesystem fsns,
|
||||
|
||||
private static void copyEditLogSegmentsToSharedDir(FSNamesystem fsns,
|
||||
Collection<URI> sharedEditsDirs, NNStorage newSharedStorage,
|
||||
Configuration conf) throws FileNotFoundException, IOException {
|
||||
Configuration conf) throws IOException {
|
||||
Preconditions.checkArgument(!sharedEditsDirs.isEmpty(),
|
||||
"No shared edits specified");
|
||||
// Copy edit log segments into the new shared edits dir.
|
||||
for (JournalAndStream jas : fsns.getFSImage().getEditLog().getJournals()) {
|
||||
FileJournalManager fjm = null;
|
||||
if (!(jas.getManager() instanceof FileJournalManager)) {
|
||||
LOG.error("Cannot populate shared edits dir from non-file " +
|
||||
"journal manager: " + jas.getManager());
|
||||
return true; // aborted
|
||||
} else {
|
||||
fjm = (FileJournalManager) jas.getManager();
|
||||
}
|
||||
for (EditLogFile elf : fjm.getLogFiles(fsns.getFSImage()
|
||||
.getMostRecentCheckpointTxId())) {
|
||||
File editLogSegment = elf.getFile();
|
||||
for (URI sharedEditsUri : sharedEditsDirs) {
|
||||
StorageDirectory sharedEditsDir = newSharedStorage
|
||||
.getStorageDirectory(sharedEditsUri);
|
||||
File targetFile = new File(sharedEditsDir.getCurrentDir(),
|
||||
editLogSegment.getName());
|
||||
if (!targetFile.exists()) {
|
||||
InputStream in = null;
|
||||
OutputStream out = null;
|
||||
try {
|
||||
in = new FileInputStream(editLogSegment);
|
||||
out = new AtomicFileOutputStream(targetFile);
|
||||
IOUtils.copyBytes(in, out, conf);
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, in, out);
|
||||
}
|
||||
}
|
||||
List<URI> sharedEditsUris = new ArrayList<URI>(sharedEditsDirs);
|
||||
FSEditLog newSharedEditLog = new FSEditLog(conf, newSharedStorage,
|
||||
sharedEditsUris);
|
||||
newSharedEditLog.initJournalsForWrite();
|
||||
newSharedEditLog.recoverUnclosedStreams();
|
||||
|
||||
FSEditLog sourceEditLog = fsns.getFSImage().editLog;
|
||||
|
||||
long fromTxId = fsns.getFSImage().getMostRecentCheckpointTxId();
|
||||
Collection<EditLogInputStream> streams = sourceEditLog.selectInputStreams(
|
||||
fromTxId+1, 0);
|
||||
|
||||
// Set the nextTxid to the CheckpointTxId+1
|
||||
newSharedEditLog.setNextTxId(fromTxId + 1);
|
||||
|
||||
// Copy all edits after last CheckpointTxId to shared edits dir
|
||||
for (EditLogInputStream stream : streams) {
|
||||
LOG.debug("Beginning to copy stream " + stream + " to shared edits");
|
||||
FSEditLogOp op;
|
||||
boolean segmentOpen = false;
|
||||
while ((op = stream.readOp()) != null) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("copying op: " + op);
|
||||
}
|
||||
if (!segmentOpen) {
|
||||
newSharedEditLog.startLogSegment(op.txid, false);
|
||||
segmentOpen = true;
|
||||
}
|
||||
|
||||
newSharedEditLog.logEdit(op);
|
||||
|
||||
if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) {
|
||||
newSharedEditLog.logSync();
|
||||
newSharedEditLog.endCurrentLogSegment(false);
|
||||
LOG.debug("ending log segment because of END_LOG_SEGMENT op in " + stream);
|
||||
segmentOpen = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (segmentOpen) {
|
||||
LOG.debug("ending log segment because of end of stream in " + stream);
|
||||
newSharedEditLog.logSync();
|
||||
newSharedEditLog.endCurrentLogSegment(false);
|
||||
segmentOpen = false;
|
||||
}
|
||||
}
|
||||
return false; // did not abort
|
||||
}
|
||||
|
||||
private static boolean finalize(Configuration conf,
|
||||
|
@ -65,7 +65,7 @@ public abstract class HATestUtil {
|
||||
* @throws CouldNotCatchUpException if the standby doesn't catch up to the
|
||||
* active in NN_LAG_TIMEOUT milliseconds
|
||||
*/
|
||||
static void waitForStandbyToCatchUp(NameNode active,
|
||||
public static void waitForStandbyToCatchUp(NameNode active,
|
||||
NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException {
|
||||
|
||||
long activeTxId = active.getNamesystem().getFSImage().getEditLog()
|
||||
|
@ -158,6 +158,13 @@ public void testInitializeSharedEdits() throws Exception {
|
||||
assertCanStartHaNameNodes("2");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailWhenNoSharedEditsSpecified() throws Exception {
|
||||
Configuration confNoShared = new Configuration(conf);
|
||||
confNoShared.unset(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
|
||||
assertFalse(NameNode.initializeSharedEdits(confNoShared, true));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDontOverWriteExistingDir() {
|
||||
assertFalse(NameNode.initializeSharedEdits(conf, false));
|
||||
|
Loading…
Reference in New Issue
Block a user