HDFS-3573. Supply NamespaceInfo when instantiating JournalManagers. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1356388 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-07-02 18:21:49 +00:00
parent 2b2344a57e
commit 7accbabdee
9 changed files with 77 additions and 33 deletions

View File

@ -102,6 +102,8 @@ Trunk (unreleased changes)
HDFS-3510. Editlog pre-allocation is performed prior to writing edits HDFS-3510. Editlog pre-allocation is performed prior to writing edits
to avoid partial edits case disk out of space.(Colin McCabe via suresh) to avoid partial edits case disk out of space.(Colin McCabe via suresh)
HDFS-3573. Supply NamespaceInfo when instantiating JournalManagers (todd)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ClientConfiguration;
@ -158,11 +159,19 @@ private byte[] intToBytes(int i) {
(byte)(i) }; (byte)(i) };
} }
BookKeeperJournalManager(Configuration conf, URI uri) throws IOException {
this(conf, uri, null);
// TODO(ivank): update BookKeeperJournalManager to do something
// with the NamespaceInfo. This constructor has been added
// for compatibility with the old tests, and may be removed
// when the tests are updated.
}
/** /**
* Construct a Bookkeeper journal manager. * Construct a Bookkeeper journal manager.
*/ */
public BookKeeperJournalManager(Configuration conf, URI uri) public BookKeeperJournalManager(Configuration conf, URI uri,
throws IOException { NamespaceInfo nsInfo) throws IOException {
this.conf = conf; this.conf = conf;
String zkConnect = uri.getAuthority().replace(";", ","); String zkConnect = uri.getAuthority().replace(";", ",");
String zkPath = uri.getPath(); String zkPath = uri.getPath();

View File

@ -64,6 +64,7 @@
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream; import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
@ -324,7 +325,7 @@ synchronized void close() {
endCurrentLogSegment(true); endCurrentLogSegment(true);
} }
if (!journalSet.isEmpty()) { if (journalSet != null && !journalSet.isEmpty()) {
try { try {
journalSet.close(); journalSet.close();
} catch (IOException ioe) { } catch (IOException ioe) {
@ -1010,7 +1011,10 @@ public synchronized void purgeLogsOlderThan(final long minTxIdToKeep) {
minTxIdToKeep <= curSegmentTxId : minTxIdToKeep <= curSegmentTxId :
"cannot purge logs older than txid " + minTxIdToKeep + "cannot purge logs older than txid " + minTxIdToKeep +
" when current segment starts at " + curSegmentTxId; " when current segment starts at " + curSegmentTxId;
if (minTxIdToKeep == 0) {
return;
}
// This could be improved to not need synchronization. But currently, // This could be improved to not need synchronization. But currently,
// journalSet is not threadsafe, so we need to synchronize this method. // journalSet is not threadsafe, so we need to synchronize this method.
try { try {
@ -1260,8 +1264,9 @@ private JournalManager createJournal(URI uri) {
try { try {
Constructor<? extends JournalManager> cons Constructor<? extends JournalManager> cons
= clazz.getConstructor(Configuration.class, URI.class); = clazz.getConstructor(Configuration.class, URI.class,
return cons.newInstance(conf, uri); NamespaceInfo.class);
return cons.newInstance(conf, uri, storage.getNamespaceInfo());
} catch (Exception e) { } catch (Exception e) {
throw new IllegalArgumentException("Unable to construct journal, " throw new IllegalArgumentException("Unable to construct journal, "
+ uri, e); + uri, e);

View File

@ -126,12 +126,6 @@ protected FSImage(Configuration conf,
} }
this.editLog = new FSEditLog(conf, storage, editsDirs); this.editLog = new FSEditLog(conf, storage, editsDirs);
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
if (!HAUtil.isHAEnabled(conf, nameserviceId)) {
editLog.initJournalsForWrite();
} else {
editLog.initSharedJournalsForRead();
}
archivalManager = new NNStorageRetentionManager(conf, storage, editLog); archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
} }
@ -511,6 +505,7 @@ void doImportCheckpoint(FSNamesystem target) throws IOException {
// return back the real image // return back the real image
realImage.getStorage().setStorageInfo(ckptImage.getStorage()); realImage.getStorage().setStorageInfo(ckptImage.getStorage());
realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1); realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1);
realImage.initEditLog();
target.dir.fsImage = realImage; target.dir.fsImage = realImage;
realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID()); realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
@ -584,10 +579,8 @@ boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery)
Iterable<EditLogInputStream> editStreams = null; Iterable<EditLogInputStream> editStreams = null;
if (editLog.isOpenForWrite()) { initEditLog();
// We only want to recover streams if we're going into Active mode.
editLog.recoverUnclosedStreams();
}
if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
getLayoutVersion())) { getLayoutVersion())) {
// If we're open for write, we're either non-HA or we're the active NN, so // If we're open for write, we're either non-HA or we're the active NN, so
@ -645,6 +638,17 @@ boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery)
return needToSave; return needToSave;
} }
public void initEditLog() {
Preconditions.checkState(getNamespaceID() != 0,
"Must know namespace ID before initting edit log");
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
if (!HAUtil.isHAEnabled(conf, nameserviceId)) {
editLog.initJournalsForWrite();
editLog.recoverUnclosedStreams();
} else {
editLog.initSharedJournalsForRead();
}
}
/** /**
* @param imageFile the image file that was loaded * @param imageFile the image file that was loaded

View File

@ -228,12 +228,6 @@ synchronized public void selectInputStreams(
(inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") + (inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
"from among " + elfs.size() + " candidate file(s)"); "from among " + elfs.size() + " candidate file(s)");
for (EditLogFile elf : elfs) { for (EditLogFile elf : elfs) {
if (elf.lastTxId < fromTxId) {
LOG.debug("passing over " + elf + " because it ends at " +
elf.lastTxId + ", but we only care about transactions " +
"as new as " + fromTxId);
continue;
}
if (elf.isInProgress()) { if (elf.isInProgress()) {
if (!inProgressOk) { if (!inProgressOk) {
LOG.debug("passing over " + elf + " because it is in progress " + LOG.debug("passing over " + elf + " because it is in progress " +
@ -248,6 +242,13 @@ synchronized public void selectInputStreams(
continue; continue;
} }
} }
if (elf.lastTxId < fromTxId) {
assert elf.lastTxId != HdfsConstants.INVALID_TXID;
LOG.debug("passing over " + elf + " because it ends at " +
elf.lastTxId + ", but we only care about transactions " +
"as new as " + fromTxId);
continue;
}
EditLogFileInputStream elfis = new EditLogFileInputStream(elf.getFile(), EditLogFileInputStream elfis = new EditLogFileInputStream(elf.getFile(),
elf.getFirstTxId(), elf.getLastTxId(), elf.isInProgress()); elf.getFirstTxId(), elf.getLastTxId(), elf.isInProgress());
LOG.debug("selecting edit log stream " + elf); LOG.debug("selecting edit log stream " + elf);

View File

@ -1125,4 +1125,13 @@ FSImageStorageInspector readAndInspectDirs()
inspectStorageDirs(inspector); inspectStorageDirs(inspector);
return inspector; return inspector;
} }
public NamespaceInfo getNamespaceInfo() {
return new NamespaceInfo(
getNamespaceID(),
getClusterID(),
getBlockPoolID(),
getCTime(),
getDistributedUpgradeVersion());
}
} }

View File

@ -67,7 +67,6 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.AtomicFileOutputStream; import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
@ -830,12 +829,7 @@ private static boolean initializeSharedEdits(Configuration conf,
Lists.<URI>newArrayList(), Lists.<URI>newArrayList(),
sharedEditsDirs); sharedEditsDirs);
newSharedStorage.format(new NamespaceInfo( newSharedStorage.format(existingStorage.getNamespaceInfo());
existingStorage.getNamespaceID(),
existingStorage.getClusterID(),
existingStorage.getBlockPoolID(),
existingStorage.getCTime(),
existingStorage.getDistributedUpgradeVersion()));
// Need to make sure the edit log segments are in good shape to initialize // Need to make sure the edit log segments are in good shape to initialize
// the shared edits dir. // the shared edits dir.

View File

@ -188,6 +188,8 @@ private int doRun() throws IOException {
// Load the newly formatted image, using all of the directories (including shared // Load the newly formatted image, using all of the directories (including shared
// edits) // edits)
FSImage image = new FSImage(conf); FSImage image = new FSImage(conf);
image.getStorage().setStorageInfo(storage);
image.initEditLog();
assert image.getEditLog().isOpenForRead() : assert image.getEditLog().isOpenForRead() :
"Expected edit log to be open for read"; "Expected edit log to be open for read";

View File

@ -24,12 +24,15 @@
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import java.net.URI; import java.net.URI;
import java.util.Collection; import java.util.Collection;
import java.io.IOException; import java.io.IOException;
public class TestGenericJournalConf { public class TestGenericJournalConf {
private static final String DUMMY_URI = "dummy://test";
/** /**
* Test that an exception is thrown if a journal class doesn't exist * Test that an exception is thrown if a journal class doesn't exist
* in the configuration * in the configuration
@ -114,12 +117,17 @@ public void testDummyJournalManager() throws Exception {
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + ".dummy", conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + ".dummy",
DummyJournalManager.class.getName()); DummyJournalManager.class.getName());
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, DUMMY_URI);
"dummy://test");
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY, 0); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY, 0);
try { try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive(); cluster.waitActive();
assertNotNull(DummyJournalManager.conf);
assertEquals(new URI(DUMMY_URI), DummyJournalManager.uri);
assertNotNull(DummyJournalManager.nsInfo);
assertEquals(DummyJournalManager.nsInfo.getClusterID(),
cluster.getNameNode().getNamesystem().getClusterId());
} finally { } finally {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
@ -128,7 +136,17 @@ public void testDummyJournalManager() throws Exception {
} }
public static class DummyJournalManager implements JournalManager { public static class DummyJournalManager implements JournalManager {
public DummyJournalManager(Configuration conf, URI u) {} static Configuration conf = null;
static URI uri = null;
static NamespaceInfo nsInfo = null;
public DummyJournalManager(Configuration conf, URI u,
NamespaceInfo nsInfo) {
// Set static vars so the test case can verify them.
DummyJournalManager.conf = conf;
DummyJournalManager.uri = u;
DummyJournalManager.nsInfo = nsInfo;
}
@Override @Override
public EditLogOutputStream startLogSegment(long txId) throws IOException { public EditLogOutputStream startLogSegment(long txId) throws IOException {
@ -162,7 +180,7 @@ public void close() throws IOException {}
public static class BadConstructorJournalManager extends DummyJournalManager { public static class BadConstructorJournalManager extends DummyJournalManager {
public BadConstructorJournalManager() { public BadConstructorJournalManager() {
super(null, null); super(null, null, null);
} }
} }
} }