HDFS-13062. Provide support for JN to use separate journal disk per namespace. Contributed by Bharat Viswanadham.
This commit is contained in:
parent
b3ae11d597
commit
dd50f53997
@ -17,18 +17,10 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.qjournal.server;
|
||||
|
||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileFilter;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
@ -47,14 +39,22 @@
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.tracing.TraceUtils;
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.htrace.core.Tracer;
|
||||
import org.eclipse.jetty.util.ajax.JSON;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Maps;
|
||||
import javax.management.ObjectName;
|
||||
import java.io.File;
|
||||
import java.io.FileFilter;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The JournalNode is a daemon which allows namenodes using
|
||||
@ -74,7 +74,7 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
|
||||
.newHashMap();
|
||||
private ObjectName journalNodeInfoBeanName;
|
||||
private String httpServerURI;
|
||||
private File localDir;
|
||||
private final ArrayList<File> localDir = Lists.newArrayList();
|
||||
Tracer tracer;
|
||||
|
||||
static {
|
||||
@ -94,11 +94,10 @@ synchronized Journal getOrCreateJournal(String jid,
|
||||
|
||||
Journal journal = journalsById.get(jid);
|
||||
if (journal == null) {
|
||||
File logDir = getLogDir(jid);
|
||||
LOG.info("Initializing journal in directory " + logDir);
|
||||
File logDir = getLogDir(jid, nameServiceId);
|
||||
LOG.info("Initializing journal in directory " + logDir);
|
||||
journal = new Journal(conf, logDir, jid, startOpt, new ErrorReporter());
|
||||
journalsById.put(jid, journal);
|
||||
|
||||
// Start SyncJouranl thread, if JournalNode Sync is enabled
|
||||
if (conf.getBoolean(
|
||||
DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY,
|
||||
@ -148,9 +147,34 @@ public Journal getOrCreateJournal(String jid,
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.localDir = new File(
|
||||
conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
|
||||
DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT).trim());
|
||||
|
||||
String journalNodeDir = null;
|
||||
Collection<String> nameserviceIds;
|
||||
|
||||
nameserviceIds = conf.getTrimmedStringCollection(
|
||||
DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);
|
||||
|
||||
if (nameserviceIds.size() == 0) {
|
||||
nameserviceIds = conf.getTrimmedStringCollection(
|
||||
DFSConfigKeys.DFS_NAMESERVICES);
|
||||
}
|
||||
|
||||
//if nameservicesIds size is less than 2, it means it is not a federated
|
||||
// setup
|
||||
if (nameserviceIds.size() < 2) {
|
||||
// Check in HA, if journal edit dir is set by appending with
|
||||
// nameserviceId
|
||||
for (String nameService : nameserviceIds) {
|
||||
journalNodeDir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY +
|
||||
"." + nameService);
|
||||
}
|
||||
if (journalNodeDir == null) {
|
||||
journalNodeDir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
|
||||
DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT);
|
||||
}
|
||||
localDir.add(new File(journalNodeDir.trim()));
|
||||
}
|
||||
|
||||
if (this.tracer == null) {
|
||||
this.tracer = new Tracer.Builder("JournalNode").
|
||||
conf(TraceUtils.wrapHadoopConf("journalnode.htrace", conf)).
|
||||
@ -158,12 +182,13 @@ public void setConf(Configuration conf) {
|
||||
}
|
||||
}
|
||||
|
||||
private static void validateAndCreateJournalDir(File dir) throws IOException {
|
||||
private static void validateAndCreateJournalDir(File dir)
|
||||
throws IOException {
|
||||
|
||||
if (!dir.isAbsolute()) {
|
||||
throw new IllegalArgumentException(
|
||||
"Journal dir '" + dir + "' should be an absolute path");
|
||||
}
|
||||
|
||||
DiskChecker.checkDir(dir);
|
||||
}
|
||||
|
||||
@ -186,8 +211,9 @@ public void start() throws IOException {
|
||||
|
||||
try {
|
||||
|
||||
validateAndCreateJournalDir(localDir);
|
||||
|
||||
for (File journalDir : localDir) {
|
||||
validateAndCreateJournalDir(journalDir);
|
||||
}
|
||||
DefaultMetricsSystem.initialize("JournalNode");
|
||||
JvmMetrics.create("JournalNode",
|
||||
conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY),
|
||||
@ -297,16 +323,33 @@ public void stopAndJoin(int rc) throws InterruptedException {
|
||||
* @param jid the journal identifier
|
||||
* @return the file, which may or may not exist yet
|
||||
*/
|
||||
private File getLogDir(String jid) {
|
||||
String dir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
|
||||
DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT);
|
||||
private File getLogDir(String jid, String nameServiceId) throws IOException{
|
||||
String dir = null;
|
||||
if (nameServiceId != null) {
|
||||
dir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY + "." +
|
||||
nameServiceId);
|
||||
}
|
||||
if (dir == null) {
|
||||
dir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
|
||||
DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT);
|
||||
}
|
||||
|
||||
File journalDir = new File(dir.trim());
|
||||
if (!localDir.contains(journalDir)) {
|
||||
//It is a federated setup, we need to validate journalDir
|
||||
validateAndCreateJournalDir(journalDir);
|
||||
localDir.add(journalDir);
|
||||
}
|
||||
|
||||
Preconditions.checkArgument(jid != null &&
|
||||
!jid.isEmpty(),
|
||||
"bad journal identifier: %s", jid);
|
||||
assert jid != null;
|
||||
return new File(new File(dir), jid);
|
||||
return new File(journalDir, jid);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override // JournalNodeMXBean
|
||||
public String getJournalsStatus() {
|
||||
// jid:{Formatted:True/False}
|
||||
@ -328,20 +371,22 @@ public String getJournalsStatus() {
|
||||
// Also note that we do not need to check localDir here since
|
||||
// validateAndCreateJournalDir has been called before we register the
|
||||
// MXBean.
|
||||
File[] journalDirs = localDir.listFiles(new FileFilter() {
|
||||
@Override
|
||||
public boolean accept(File file) {
|
||||
return file.isDirectory();
|
||||
}
|
||||
});
|
||||
for (File jDir : localDir) {
|
||||
File[] journalDirs = jDir.listFiles(new FileFilter() {
|
||||
@Override
|
||||
public boolean accept(File file) {
|
||||
return file.isDirectory();
|
||||
}
|
||||
});
|
||||
|
||||
if (journalDirs != null) {
|
||||
for (File journalDir : journalDirs) {
|
||||
String jid = journalDir.getName();
|
||||
if (!status.containsKey(jid)) {
|
||||
Map<String, String> jMap = new HashMap<String, String>();
|
||||
jMap.put("Formatted", "true");
|
||||
status.put(jid, jMap);
|
||||
if (journalDirs != null) {
|
||||
for (File journalDir : journalDirs) {
|
||||
String jid = journalDir.getName();
|
||||
if (!status.containsKey(jid)) {
|
||||
Map<String, String> jMap = new HashMap<String, String>();
|
||||
jMap.put("Formatted", "true");
|
||||
status.put(jid, jMap);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,23 +17,14 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.qjournal.server;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.primitives.Bytes;
|
||||
import com.google.common.primitives.Ints;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
|
||||
import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
|
||||
@ -52,16 +43,21 @@
|
||||
import org.apache.hadoop.util.StopWatch;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import static org.junit.Assert.*;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.primitives.Bytes;
|
||||
import com.google.common.primitives.Ints;
|
||||
import org.junit.rules.TestName;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
public class TestJournalNode {
|
||||
private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
|
||||
@ -87,9 +83,29 @@ public void setup() throws Exception {
|
||||
File editsDir = new File(MiniDFSCluster.getBaseDirectory() +
|
||||
File.separator + "TestJournalNode");
|
||||
FileUtil.fullyDelete(editsDir);
|
||||
|
||||
conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
|
||||
editsDir.getAbsolutePath());
|
||||
|
||||
if (testName.getMethodName().equals("testJournalDirPerNameSpace")) {
|
||||
setFederationConf();
|
||||
conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY+ ".ns1",
|
||||
editsDir + File.separator + "ns1");
|
||||
conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY+ ".ns2",
|
||||
editsDir + File.separator + "ns2");
|
||||
} else if (testName.getMethodName().equals(
|
||||
"testJournalCommonDirAcrossNameSpace")){
|
||||
setFederationConf();
|
||||
conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
|
||||
editsDir.getAbsolutePath());
|
||||
} else if (testName.getMethodName().equals(
|
||||
"testJournalDefaultDirForOneNameSpace")) {
|
||||
FileUtil.fullyDelete(new File(DFSConfigKeys
|
||||
.DFS_JOURNALNODE_EDITS_DIR_DEFAULT));
|
||||
setFederationConf();
|
||||
conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY+ ".ns1",
|
||||
editsDir + File.separator + "ns1");
|
||||
} else {
|
||||
conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
|
||||
editsDir.getAbsolutePath());
|
||||
}
|
||||
conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
|
||||
"0.0.0.0:0");
|
||||
if (testName.getMethodName().equals(
|
||||
@ -128,18 +144,102 @@ public void setup() throws Exception {
|
||||
jn = new JournalNode();
|
||||
jn.setConf(conf);
|
||||
jn.start();
|
||||
journalId = "test-journalid-" + GenericTestUtils.uniqueSequenceId();
|
||||
journal = jn.getOrCreateJournal(journalId);
|
||||
journal.format(FAKE_NSINFO);
|
||||
|
||||
|
||||
if (testName.getMethodName().equals("testJournalDirPerNameSpace") ||
|
||||
testName.getMethodName().equals(
|
||||
"testJournalCommonDirAcrossNameSpace") ||
|
||||
testName.getMethodName().equals(
|
||||
"testJournalDefaultDirForOneNameSpace")) {
|
||||
Collection<String> nameServiceIds = DFSUtilClient.getNameServiceIds(conf);
|
||||
for(String nsId: nameServiceIds) {
|
||||
journalId = "test-journalid-" + nsId;
|
||||
journal = jn.getOrCreateJournal(journalId, nsId,
|
||||
HdfsServerConstants.StartupOption.REGULAR);
|
||||
NamespaceInfo fakeNameSpaceInfo = new NamespaceInfo(
|
||||
12345, "mycluster", "my-bp"+nsId, 0L);
|
||||
journal.format(fakeNameSpaceInfo);
|
||||
}
|
||||
} else {
|
||||
journalId = "test-journalid-" + GenericTestUtils.uniqueSequenceId();
|
||||
journal = jn.getOrCreateJournal(journalId);
|
||||
journal.format(FAKE_NSINFO);
|
||||
}
|
||||
|
||||
|
||||
ch = new IPCLoggerChannel(conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress());
|
||||
}
|
||||
|
||||
private void setFederationConf() {
|
||||
conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns1, ns2");
|
||||
|
||||
//ns1
|
||||
conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + ".ns1", "nn1,nn2");
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn1",
|
||||
"qjournal://journalnode0:9900;journalnode1:9901/ns1");
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn2",
|
||||
"qjournal://journalnode0:9900;journalnode1:9901/ns2");
|
||||
|
||||
//ns2
|
||||
conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + ".ns2", "nn3,nn4");
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns2" +".nn3",
|
||||
"qjournal://journalnode0:9900;journalnode1:9901/ns2");
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns2" +".nn4",
|
||||
"qjournal://journalnode0:9900;journalnode1:9901/ns2");
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() throws Exception {
|
||||
jn.stop(0);
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout=100000)
|
||||
public void testJournalDirPerNameSpace() {
|
||||
Collection<String> nameServiceIds = DFSUtilClient.getNameServiceIds(conf);
|
||||
setupStaticHostResolution(2, "journalnode");
|
||||
for (String nsId : nameServiceIds) {
|
||||
String jid = "test-journalid-" + nsId;
|
||||
Journal nsJournal = jn.getJournal(jid);
|
||||
JNStorage journalStorage = nsJournal.getStorage();
|
||||
File editsDir = new File(MiniDFSCluster.getBaseDirectory() +
|
||||
File.separator + "TestJournalNode" + File.separator
|
||||
+ nsId + File.separator + jid);
|
||||
assertEquals(editsDir.toString(), journalStorage.getRoot().toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=100000)
|
||||
public void testJournalCommonDirAcrossNameSpace() {
|
||||
Collection<String> nameServiceIds = DFSUtilClient.getNameServiceIds(conf);
|
||||
setupStaticHostResolution(2, "journalnode");
|
||||
for (String nsId : nameServiceIds) {
|
||||
String jid = "test-journalid-" + nsId;
|
||||
Journal nsJournal = jn.getJournal(jid);
|
||||
JNStorage journalStorage = nsJournal.getStorage();
|
||||
File editsDir = new File(MiniDFSCluster.getBaseDirectory() +
|
||||
File.separator + "TestJournalNode" + File.separator + jid);
|
||||
assertEquals(editsDir.toString(), journalStorage.getRoot().toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=100000)
|
||||
public void testJournalDefaultDirForOneNameSpace() {
|
||||
Collection<String> nameServiceIds = DFSUtilClient.getNameServiceIds(conf);
|
||||
setupStaticHostResolution(2, "journalnode");
|
||||
String jid = "test-journalid-ns1";
|
||||
Journal nsJournal = jn.getJournal(jid);
|
||||
JNStorage journalStorage = nsJournal.getStorage();
|
||||
File editsDir = new File(MiniDFSCluster.getBaseDirectory() +
|
||||
File.separator + "TestJournalNode" + File.separator + "ns1" + File
|
||||
.separator + jid);
|
||||
assertEquals(editsDir.toString(), journalStorage.getRoot().toString());
|
||||
jid = "test-journalid-ns2";
|
||||
nsJournal = jn.getJournal(jid);
|
||||
journalStorage = nsJournal.getStorage();
|
||||
editsDir = new File(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT +
|
||||
File.separator + jid);
|
||||
assertEquals(editsDir.toString(), journalStorage.getRoot().toString());
|
||||
}
|
||||
@Test(timeout=100000)
|
||||
public void testJournal() throws Exception {
|
||||
MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(
|
||||
|
Loading…
Reference in New Issue
Block a user