diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java index 0954eaf6c1..c772dfcb90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java @@ -17,18 +17,10 @@ */ package org.apache.hadoop.hdfs.qjournal.server; -import static org.apache.hadoop.util.ExitUtil.terminate; - -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.Map; - -import javax.management.ObjectName; - import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -47,14 +39,22 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.tracing.TraceUtils; import org.apache.hadoop.util.DiskChecker; +import static org.apache.hadoop.util.ExitUtil.terminate; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.htrace.core.Tracer; import org.eclipse.jetty.util.ajax.JSON; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; +import javax.management.ObjectName; +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; /** * The JournalNode is a daemon which allows namenodes using @@ -74,7 +74,7 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean { .newHashMap(); private ObjectName journalNodeInfoBeanName; private String httpServerURI; - private File localDir; + private final ArrayList localDir = Lists.newArrayList(); Tracer tracer; static { @@ -94,11 +94,10 @@ synchronized Journal getOrCreateJournal(String jid, Journal journal = journalsById.get(jid); if (journal == null) { - File logDir = getLogDir(jid); - LOG.info("Initializing journal in directory " + logDir); + File logDir = getLogDir(jid, nameServiceId); + LOG.info("Initializing journal in directory " + logDir); journal = new Journal(conf, logDir, jid, startOpt, new ErrorReporter()); journalsById.put(jid, journal); - // Start SyncJouranl thread, if JournalNode Sync is enabled if (conf.getBoolean( DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, @@ -148,9 +147,34 @@ public Journal getOrCreateJournal(String jid, @Override public void setConf(Configuration conf) { this.conf = conf; - this.localDir = new File( - conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, - DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT).trim()); + + String journalNodeDir = null; + Collection nameserviceIds; + + nameserviceIds = conf.getTrimmedStringCollection( + DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY); + + if (nameserviceIds.size() == 0) { + nameserviceIds = conf.getTrimmedStringCollection( + DFSConfigKeys.DFS_NAMESERVICES); + } + + //if nameservicesIds size is less than 2, it means it is not a federated + // setup + if (nameserviceIds.size() < 2) { + // Check in HA, if journal edit dir is set by appending with + // nameserviceId + for (String nameService : nameserviceIds) { + journalNodeDir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY + + "." + nameService); + } + if (journalNodeDir == null) { + journalNodeDir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, + DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT); + } + localDir.add(new File(journalNodeDir.trim())); + } + if (this.tracer == null) { this.tracer = new Tracer.Builder("JournalNode"). conf(TraceUtils.wrapHadoopConf("journalnode.htrace", conf)). @@ -158,12 +182,13 @@ public void setConf(Configuration conf) { } } - private static void validateAndCreateJournalDir(File dir) throws IOException { + private static void validateAndCreateJournalDir(File dir) + throws IOException { + if (!dir.isAbsolute()) { throw new IllegalArgumentException( "Journal dir '" + dir + "' should be an absolute path"); } - DiskChecker.checkDir(dir); } @@ -186,8 +211,9 @@ public void start() throws IOException { try { - validateAndCreateJournalDir(localDir); - + for (File journalDir : localDir) { + validateAndCreateJournalDir(journalDir); + } DefaultMetricsSystem.initialize("JournalNode"); JvmMetrics.create("JournalNode", conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY), @@ -297,16 +323,33 @@ public void stopAndJoin(int rc) throws InterruptedException { * @param jid the journal identifier * @return the file, which may or may not exist yet */ - private File getLogDir(String jid) { - String dir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, - DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT); + private File getLogDir(String jid, String nameServiceId) throws IOException{ + String dir = null; + if (nameServiceId != null) { + dir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY + "." + + nameServiceId); + } + if (dir == null) { + dir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, + DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT); + } + + File journalDir = new File(dir.trim()); + if (!localDir.contains(journalDir)) { + //It is a federated setup, we need to validate journalDir + validateAndCreateJournalDir(journalDir); + localDir.add(journalDir); + } + Preconditions.checkArgument(jid != null && !jid.isEmpty(), "bad journal identifier: %s", jid); assert jid != null; - return new File(new File(dir), jid); + return new File(journalDir, jid); } + + @Override // JournalNodeMXBean public String getJournalsStatus() { // jid:{Formatted:True/False} @@ -328,20 +371,22 @@ public String getJournalsStatus() { // Also note that we do not need to check localDir here since // validateAndCreateJournalDir has been called before we register the // MXBean. - File[] journalDirs = localDir.listFiles(new FileFilter() { - @Override - public boolean accept(File file) { - return file.isDirectory(); - } - }); + for (File jDir : localDir) { + File[] journalDirs = jDir.listFiles(new FileFilter() { + @Override + public boolean accept(File file) { + return file.isDirectory(); + } + }); - if (journalDirs != null) { - for (File journalDir : journalDirs) { - String jid = journalDir.getName(); - if (!status.containsKey(jid)) { - Map jMap = new HashMap(); - jMap.put("Formatted", "true"); - status.put(jid, jMap); + if (journalDirs != null) { + for (File journalDir : journalDirs) { + String jid = journalDir.getName(); + if (!status.containsKey(jid)) { + Map jMap = new HashMap(); + jMap.put("Formatted", "true"); + status.put(jid, jMap); + } } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java index 9bd686ff7d..581218d161 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java @@ -17,23 +17,14 @@ */ package org.apache.hadoop.hdfs.qjournal.server; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - +import com.google.common.base.Charsets; +import com.google.common.primitives.Bytes; +import com.google.common.primitives.Ints; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.qjournal.QJMTestUtil; import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel; @@ -52,16 +43,21 @@ import org.apache.hadoop.util.StopWatch; import org.junit.After; import org.junit.Assert; +import static org.junit.Assert.*; import org.junit.Before; import org.junit.Rule; import org.junit.Test; - -import com.google.common.base.Charsets; -import com.google.common.primitives.Bytes; -import com.google.common.primitives.Ints; import org.junit.rules.TestName; import org.mockito.Mockito; +import java.io.File; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Collection; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + public class TestJournalNode { private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo( @@ -87,9 +83,29 @@ public void setup() throws Exception { File editsDir = new File(MiniDFSCluster.getBaseDirectory() + File.separator + "TestJournalNode"); FileUtil.fullyDelete(editsDir); - - conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, - editsDir.getAbsolutePath()); + + if (testName.getMethodName().equals("testJournalDirPerNameSpace")) { + setFederationConf(); + conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY+ ".ns1", + editsDir + File.separator + "ns1"); + conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY+ ".ns2", + editsDir + File.separator + "ns2"); + } else if (testName.getMethodName().equals( + "testJournalCommonDirAcrossNameSpace")){ + setFederationConf(); + conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, + editsDir.getAbsolutePath()); + } else if (testName.getMethodName().equals( + "testJournalDefaultDirForOneNameSpace")) { + FileUtil.fullyDelete(new File(DFSConfigKeys + .DFS_JOURNALNODE_EDITS_DIR_DEFAULT)); + setFederationConf(); + conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY+ ".ns1", + editsDir + File.separator + "ns1"); + } else { + conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, + editsDir.getAbsolutePath()); + } conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0"); if (testName.getMethodName().equals( @@ -128,18 +144,102 @@ public void setup() throws Exception { jn = new JournalNode(); jn.setConf(conf); jn.start(); - journalId = "test-journalid-" + GenericTestUtils.uniqueSequenceId(); - journal = jn.getOrCreateJournal(journalId); - journal.format(FAKE_NSINFO); + + + if (testName.getMethodName().equals("testJournalDirPerNameSpace") || + testName.getMethodName().equals( + "testJournalCommonDirAcrossNameSpace") || + testName.getMethodName().equals( + "testJournalDefaultDirForOneNameSpace")) { + Collection nameServiceIds = DFSUtilClient.getNameServiceIds(conf); + for(String nsId: nameServiceIds) { + journalId = "test-journalid-" + nsId; + journal = jn.getOrCreateJournal(journalId, nsId, + HdfsServerConstants.StartupOption.REGULAR); + NamespaceInfo fakeNameSpaceInfo = new NamespaceInfo( + 12345, "mycluster", "my-bp"+nsId, 0L); + journal.format(fakeNameSpaceInfo); + } + } else { + journalId = "test-journalid-" + GenericTestUtils.uniqueSequenceId(); + journal = jn.getOrCreateJournal(journalId); + journal.format(FAKE_NSINFO); + } + ch = new IPCLoggerChannel(conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); } + + private void setFederationConf() { + conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns1, ns2"); + + //ns1 + conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + ".ns1", "nn1,nn2"); + conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn1", + "qjournal://journalnode0:9900;journalnode1:9901/ns1"); + conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn2", + "qjournal://journalnode0:9900;journalnode1:9901/ns2"); + + //ns2 + conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + ".ns2", "nn3,nn4"); + conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns2" +".nn3", + "qjournal://journalnode0:9900;journalnode1:9901/ns2"); + conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns2" +".nn4", + "qjournal://journalnode0:9900;journalnode1:9901/ns2"); + } @After public void teardown() throws Exception { jn.stop(0); } - + + @Test(timeout=100000) + public void testJournalDirPerNameSpace() { + Collection nameServiceIds = DFSUtilClient.getNameServiceIds(conf); + setupStaticHostResolution(2, "journalnode"); + for (String nsId : nameServiceIds) { + String jid = "test-journalid-" + nsId; + Journal nsJournal = jn.getJournal(jid); + JNStorage journalStorage = nsJournal.getStorage(); + File editsDir = new File(MiniDFSCluster.getBaseDirectory() + + File.separator + "TestJournalNode" + File.separator + + nsId + File.separator + jid); + assertEquals(editsDir.toString(), journalStorage.getRoot().toString()); + } + } + + @Test(timeout=100000) + public void testJournalCommonDirAcrossNameSpace() { + Collection nameServiceIds = DFSUtilClient.getNameServiceIds(conf); + setupStaticHostResolution(2, "journalnode"); + for (String nsId : nameServiceIds) { + String jid = "test-journalid-" + nsId; + Journal nsJournal = jn.getJournal(jid); + JNStorage journalStorage = nsJournal.getStorage(); + File editsDir = new File(MiniDFSCluster.getBaseDirectory() + + File.separator + "TestJournalNode" + File.separator + jid); + assertEquals(editsDir.toString(), journalStorage.getRoot().toString()); + } + } + + @Test(timeout=100000) + public void testJournalDefaultDirForOneNameSpace() { + Collection nameServiceIds = DFSUtilClient.getNameServiceIds(conf); + setupStaticHostResolution(2, "journalnode"); + String jid = "test-journalid-ns1"; + Journal nsJournal = jn.getJournal(jid); + JNStorage journalStorage = nsJournal.getStorage(); + File editsDir = new File(MiniDFSCluster.getBaseDirectory() + + File.separator + "TestJournalNode" + File.separator + "ns1" + File + .separator + jid); + assertEquals(editsDir.toString(), journalStorage.getRoot().toString()); + jid = "test-journalid-ns2"; + nsJournal = jn.getJournal(jid); + journalStorage = nsJournal.getStorage(); + editsDir = new File(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT + + File.separator + jid); + assertEquals(editsDir.toString(), journalStorage.getRoot().toString()); + } @Test(timeout=100000) public void testJournal() throws Exception { MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(