HDFS-3259. NameNode#initializeSharedEdits should populate shared edits dir with edit log segments. Contributed by Aaron T. Myers.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1325518 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
047a7b276c
commit
07a4367445
@ -373,6 +373,9 @@ Release 2.0.0 - UNRELEASED
|
|||||||
HDFS-2983. Relax the build version check to permit rolling upgrades within
|
HDFS-2983. Relax the build version check to permit rolling upgrades within
|
||||||
a release. (atm)
|
a release. (atm)
|
||||||
|
|
||||||
|
HDFS-3259. NameNode#initializeSharedEdits should populate shared edits dir
|
||||||
|
with edit log segments. (atm)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-3024. Improve performance of stringification in addStoredBlock (todd)
|
HDFS-3024. Improve performance of stringification in addStoredBlock (todd)
|
||||||
|
@ -311,11 +311,13 @@ synchronized void close() {
|
|||||||
endCurrentLogSegment(true);
|
endCurrentLogSegment(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!journalSet.isEmpty()) {
|
||||||
try {
|
try {
|
||||||
journalSet.close();
|
journalSet.close();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.warn("Error closing journalSet", ioe);
|
LOG.warn("Error closing journalSet", ioe);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
state = State.CLOSED;
|
state = State.CLOSED;
|
||||||
}
|
}
|
||||||
@ -813,9 +815,8 @@ void logReassignLease(String leaseHolder, String src, String newHolder) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used only by unit tests.
|
* Get all the journals this edit log is currently operating on.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
|
||||||
synchronized List<JournalAndStream> getJournals() {
|
synchronized List<JournalAndStream> getJournals() {
|
||||||
return journalSet.getAllJournalStreams();
|
return journalSet.getAllJournalStreams();
|
||||||
}
|
}
|
||||||
|
@ -344,7 +344,7 @@ synchronized public void recoverUnfinalizedSegments() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
|
List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
|
||||||
File currentDir = sd.getCurrentDir();
|
File currentDir = sd.getCurrentDir();
|
||||||
List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
|
List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
|
||||||
List<EditLogFile> logFiles = Lists.newArrayList();
|
List<EditLogFile> logFiles = Lists.newArrayList();
|
||||||
|
@ -18,14 +18,17 @@
|
|||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
@ -41,7 +44,6 @@
|
|||||||
import org.apache.hadoop.fs.Trash;
|
import org.apache.hadoop.fs.Trash;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.HAUtil;
|
import org.apache.hadoop.hdfs.HAUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
@ -49,6 +51,9 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
|
import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby;
|
import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
||||||
@ -61,6 +66,8 @@
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
|
import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
@ -751,7 +758,8 @@ public static boolean initializeSharedEdits(Configuration conf,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Format a new shared edits dir.
|
* Format a new shared edits dir and copy in enough edit log segments so that
|
||||||
|
* the standby NN can start up.
|
||||||
*
|
*
|
||||||
* @param conf configuration
|
* @param conf configuration
|
||||||
* @param force format regardless of whether or not the shared edits dir exists
|
* @param force format regardless of whether or not the shared edits dir exists
|
||||||
@ -785,8 +793,19 @@ private static boolean initializeSharedEdits(Configuration conf,
|
|||||||
existingStorage.getBlockPoolID(),
|
existingStorage.getBlockPoolID(),
|
||||||
existingStorage.getCTime(),
|
existingStorage.getCTime(),
|
||||||
existingStorage.getDistributedUpgradeVersion()));
|
existingStorage.getDistributedUpgradeVersion()));
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Could not format shared edits dir", e);
|
// Need to make sure the edit log segments are in good shape to initialize
|
||||||
|
// the shared edits dir.
|
||||||
|
fsns.getFSImage().getEditLog().close();
|
||||||
|
fsns.getFSImage().getEditLog().initJournalsForWrite();
|
||||||
|
fsns.getFSImage().getEditLog().recoverUnclosedStreams();
|
||||||
|
|
||||||
|
if (copyEditLogSegmentsToSharedDir(fsns, sharedEditsDirs,
|
||||||
|
newSharedStorage, conf)) {
|
||||||
|
return true; // aborted
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.error("Could not initialize shared edits dir", ioe);
|
||||||
return true; // aborted
|
return true; // aborted
|
||||||
} finally {
|
} finally {
|
||||||
// Have to unlock storage explicitly for the case when we're running in a
|
// Have to unlock storage explicitly for the case when we're running in a
|
||||||
@ -803,6 +822,44 @@ private static boolean initializeSharedEdits(Configuration conf,
|
|||||||
return false; // did not abort
|
return false; // did not abort
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean copyEditLogSegmentsToSharedDir(FSNamesystem fsns,
|
||||||
|
Collection<URI> sharedEditsDirs, NNStorage newSharedStorage,
|
||||||
|
Configuration conf) throws FileNotFoundException, IOException {
|
||||||
|
// Copy edit log segments into the new shared edits dir.
|
||||||
|
for (JournalAndStream jas : fsns.getFSImage().getEditLog().getJournals()) {
|
||||||
|
FileJournalManager fjm = null;
|
||||||
|
if (!(jas.getManager() instanceof FileJournalManager)) {
|
||||||
|
LOG.error("Cannot populate shared edits dir from non-file " +
|
||||||
|
"journal manager: " + jas.getManager());
|
||||||
|
return true; // aborted
|
||||||
|
} else {
|
||||||
|
fjm = (FileJournalManager) jas.getManager();
|
||||||
|
}
|
||||||
|
for (EditLogFile elf : fjm.getLogFiles(fsns.getFSImage()
|
||||||
|
.getMostRecentCheckpointTxId())) {
|
||||||
|
File editLogSegment = elf.getFile();
|
||||||
|
for (URI sharedEditsUri : sharedEditsDirs) {
|
||||||
|
StorageDirectory sharedEditsDir = newSharedStorage
|
||||||
|
.getStorageDirectory(sharedEditsUri);
|
||||||
|
File targetFile = new File(sharedEditsDir.getCurrentDir(),
|
||||||
|
editLogSegment.getName());
|
||||||
|
if (!targetFile.exists()) {
|
||||||
|
InputStream in = null;
|
||||||
|
OutputStream out = null;
|
||||||
|
try {
|
||||||
|
in = new FileInputStream(editLogSegment);
|
||||||
|
out = new AtomicFileOutputStream(targetFile);
|
||||||
|
IOUtils.copyBytes(in, out, conf);
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(LOG, in, out);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false; // did not abort
|
||||||
|
}
|
||||||
|
|
||||||
private static boolean finalize(Configuration conf,
|
private static boolean finalize(Configuration conf,
|
||||||
boolean isConfirmationNeeded
|
boolean isConfirmationNeeded
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
|
@ -19,17 +19,22 @@
|
|||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.ha.ServiceFailedException;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.HAUtil;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -48,6 +53,9 @@ public class TestInitializeSharedEdits {
|
|||||||
@Before
|
@Before
|
||||||
public void setupCluster() throws IOException {
|
public void setupCluster() throws IOException {
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 1);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||||
|
HAUtil.setAllowStandbyReads(conf, true);
|
||||||
|
|
||||||
MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology();
|
MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology();
|
||||||
|
|
||||||
@ -57,10 +65,7 @@ public void setupCluster() throws IOException {
|
|||||||
.build();
|
.build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
cluster.shutdownNameNode(0);
|
shutdownClusterAndRemoveSharedEditsDir();
|
||||||
cluster.shutdownNameNode(1);
|
|
||||||
File sharedEditsDir = new File(cluster.getSharedEditsDir(0, 1));
|
|
||||||
assertTrue(FileUtil.fullyDelete(sharedEditsDir));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
@ -70,8 +75,14 @@ public void shutdownCluster() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
private void shutdownClusterAndRemoveSharedEditsDir() throws IOException {
|
||||||
public void testInitializeSharedEdits() throws Exception {
|
cluster.shutdownNameNode(0);
|
||||||
|
cluster.shutdownNameNode(1);
|
||||||
|
File sharedEditsDir = new File(cluster.getSharedEditsDir(0, 1));
|
||||||
|
assertTrue(FileUtil.fullyDelete(sharedEditsDir));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertCannotStartNameNodes() {
|
||||||
// Make sure we can't currently start either NN.
|
// Make sure we can't currently start either NN.
|
||||||
try {
|
try {
|
||||||
cluster.restartNameNode(0, false);
|
cluster.restartNameNode(0, false);
|
||||||
@ -89,24 +100,27 @@ public void testInitializeSharedEdits() throws Exception {
|
|||||||
GenericTestUtils.assertExceptionContains(
|
GenericTestUtils.assertExceptionContains(
|
||||||
"Cannot start an HA namenode with name dirs that need recovery", ioe);
|
"Cannot start an HA namenode with name dirs that need recovery", ioe);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Initialize the shared edits dir.
|
private void assertCanStartHaNameNodes(String pathSuffix)
|
||||||
assertFalse(NameNode.initializeSharedEdits(conf));
|
throws ServiceFailedException, IOException, URISyntaxException,
|
||||||
|
InterruptedException {
|
||||||
// Now should be able to start both NNs. Pass "false" here so that we don't
|
// Now should be able to start both NNs. Pass "false" here so that we don't
|
||||||
// try to waitActive on all NNs, since the second NN doesn't exist yet.
|
// try to waitActive on all NNs, since the second NN doesn't exist yet.
|
||||||
cluster.restartNameNode(0, false);
|
cluster.restartNameNode(0, false);
|
||||||
cluster.restartNameNode(1, true);
|
cluster.restartNameNode(1, true);
|
||||||
|
|
||||||
// Make sure HA is working.
|
// Make sure HA is working.
|
||||||
cluster.transitionToActive(0);
|
cluster.getNameNode(0).getRpcServer().transitionToActive();
|
||||||
FileSystem fs = null;
|
FileSystem fs = null;
|
||||||
try {
|
try {
|
||||||
|
Path newPath = new Path(TEST_PATH, pathSuffix);
|
||||||
fs = HATestUtil.configureFailoverFs(cluster, conf);
|
fs = HATestUtil.configureFailoverFs(cluster, conf);
|
||||||
assertTrue(fs.mkdirs(TEST_PATH));
|
assertTrue(fs.mkdirs(newPath));
|
||||||
cluster.transitionToStandby(0);
|
HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
|
||||||
cluster.transitionToActive(1);
|
cluster.getNameNode(1));
|
||||||
assertTrue(fs.isDirectory(TEST_PATH));
|
assertTrue(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
|
||||||
|
newPath.toString(), false).isDir());
|
||||||
} finally {
|
} finally {
|
||||||
if (fs != null) {
|
if (fs != null) {
|
||||||
fs.close();
|
fs.close();
|
||||||
@ -114,6 +128,29 @@ public void testInitializeSharedEdits() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInitializeSharedEdits() throws Exception {
|
||||||
|
assertCannotStartNameNodes();
|
||||||
|
|
||||||
|
// Initialize the shared edits dir.
|
||||||
|
assertFalse(NameNode.initializeSharedEdits(cluster.getConfiguration(0)));
|
||||||
|
|
||||||
|
assertCanStartHaNameNodes("1");
|
||||||
|
|
||||||
|
// Now that we've done a metadata operation, make sure that deleting and
|
||||||
|
// re-initializing the shared edits dir will let the standby still start.
|
||||||
|
|
||||||
|
shutdownClusterAndRemoveSharedEditsDir();
|
||||||
|
|
||||||
|
assertCannotStartNameNodes();
|
||||||
|
|
||||||
|
// Re-initialize the shared edits dir.
|
||||||
|
assertFalse(NameNode.initializeSharedEdits(cluster.getConfiguration(0)));
|
||||||
|
|
||||||
|
// Should *still* be able to start both NNs
|
||||||
|
assertCanStartHaNameNodes("2");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDontOverWriteExistingDir() {
|
public void testDontOverWriteExistingDir() {
|
||||||
assertFalse(NameNode.initializeSharedEdits(conf, false));
|
assertFalse(NameNode.initializeSharedEdits(conf, false));
|
||||||
|
Loading…
Reference in New Issue
Block a user