HDFS-13523. Support observer nodes in MiniDFSCluster. Contributed by Konstantin Shvachko.

This commit is contained in:
Konstantin V Shvachko 2018-10-11 22:39:17 -07:00
parent b5b9b77707
commit 1c4265d7bc
5 changed files with 67 additions and 85 deletions

View File

@ -18,14 +18,15 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory; import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
@ -66,6 +67,7 @@ public class TestStateAlignmentContextWithHA {
private static final Configuration CONF = new HdfsConfiguration(); private static final Configuration CONF = new HdfsConfiguration();
private static final List<ClientGSIContext> AC_LIST = new ArrayList<>(); private static final List<ClientGSIContext> AC_LIST = new ArrayList<>();
private static MiniQJMHACluster qjmhaCluster;
private static MiniDFSCluster cluster; private static MiniDFSCluster cluster;
private static List<Worker> clients; private static List<Worker> clients;
@ -87,33 +89,26 @@ public ORPPwithAlignmentContexts(
@BeforeClass @BeforeClass
public static void startUpCluster() throws IOException { public static void startUpCluster() throws IOException {
// disable block scanner
CONF.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
// Set short retry timeouts so this test runs faster // Set short retry timeouts so this test runs faster
CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
CONF.setBoolean("fs.hdfs.impl.disable.cache", true); CONF.setBoolean(String.format(
"fs.%s.impl.disable.cache", HdfsConstants.HDFS_URI_SCHEME), true);
CONF.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, NUMDATANODES);
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES) qjmhaCluster = HATestUtil.setUpObserverCluster(CONF, 1, NUMDATANODES, true);
.nnTopology(MiniDFSNNTopology.simpleHATopology(3)) cluster = qjmhaCluster.getDfsCluster();
.build();
cluster.waitActive();
cluster.transitionToActive(0);
cluster.transitionToObserver(2);
HATestUtil.setupHAConfiguration(
cluster, CONF, 0, ORPPwithAlignmentContexts.class);
} }
@Before @Before
public void before() throws IOException, URISyntaxException { public void before() throws IOException, URISyntaxException {
dfs = (DistributedFileSystem) FileSystem.get(CONF); dfs = HATestUtil.configureObserverReadFs(
cluster, CONF, ORPPwithAlignmentContexts.class, true);
} }
@AfterClass @AfterClass
public static void shutDownCluster() throws IOException { public static void shutDownCluster() throws IOException {
if (cluster != null) { if (qjmhaCluster != null) {
cluster.shutdown(); qjmhaCluster.shutdown();
cluster = null;
} }
} }
@ -144,9 +139,9 @@ public void testStateTransferOnWrite() throws Exception {
long postWriteState = long postWriteState =
cluster.getNamesystem(active).getLastWrittenTransactionId(); cluster.getNamesystem(active).getLastWrittenTransactionId();
// Write(s) should have increased state. Check for greater than. // Write(s) should have increased state. Check for greater than.
assertThat(clientState > preWriteState, is(true)); assertTrue(clientState > preWriteState);
// Client and server state should be equal. // Client and server state should be equal.
assertThat(clientState, is(postWriteState)); assertEquals(clientState, postWriteState);
} }
/** /**
@ -161,7 +156,7 @@ public void testStateTransferOnRead() throws Exception {
DFSTestUtil.readFile(dfs, new Path("/testFile2")); DFSTestUtil.readFile(dfs, new Path("/testFile2"));
// Read should catch client up to last written state. // Read should catch client up to last written state.
long clientState = getContext(0).getLastSeenStateId(); long clientState = getContext(0).getLastSeenStateId();
assertThat(clientState, is(lastWrittenId)); assertEquals(clientState, lastWrittenId);
} }
/** /**
@ -173,12 +168,12 @@ public void testStateTransferOnFreshClient() throws Exception {
DFSTestUtil.writeFile(dfs, new Path("/testFile3"), "ezpz"); DFSTestUtil.writeFile(dfs, new Path("/testFile3"), "ezpz");
long lastWrittenId = long lastWrittenId =
cluster.getNamesystem(active).getLastWrittenTransactionId(); cluster.getNamesystem(active).getLastWrittenTransactionId();
try (DistributedFileSystem clearDfs = try (DistributedFileSystem clearDfs = HATestUtil.configureObserverReadFs(
(DistributedFileSystem) FileSystem.get(CONF)) { cluster, CONF, ORPPwithAlignmentContexts.class, true);) {
ClientGSIContext clientState = getContext(1); ClientGSIContext clientState = getContext(1);
assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE)); assertEquals(clientState.getLastSeenStateId(), Long.MIN_VALUE);
DFSTestUtil.readFile(clearDfs, new Path("/testFile3")); DFSTestUtil.readFile(clearDfs, new Path("/testFile3"));
assertThat(clientState.getLastSeenStateId(), is(lastWrittenId)); assertEquals(clientState.getLastSeenStateId(), lastWrittenId);
} }
} }
@ -196,9 +191,9 @@ public void testStateTransferOnWriteWithFailover() throws Exception {
long postWriteState = long postWriteState =
cluster.getNamesystem(active).getLastWrittenTransactionId(); cluster.getNamesystem(active).getLastWrittenTransactionId();
// Write(s) should have increased state. Check for greater than. // Write(s) should have increased state. Check for greater than.
assertThat(clientState > preWriteState, is(true)); assertTrue(clientState > preWriteState);
// Client and server state should be equal. // Client and server state should be equal.
assertThat(clientState, is(postWriteState)); assertEquals(clientState, postWriteState);
// Failover NameNode. // Failover NameNode.
failOver(); failOver();
@ -210,9 +205,9 @@ public void testStateTransferOnWriteWithFailover() throws Exception {
cluster.getNamesystem(active).getLastWrittenTransactionId(); cluster.getNamesystem(active).getLastWrittenTransactionId();
// Write(s) should have increased state. Check for greater than. // Write(s) should have increased state. Check for greater than.
assertThat(clientStateFO > postWriteState, is(true)); assertTrue(clientStateFO > postWriteState);
// Client and server state should be equal. // Client and server state should be equal.
assertThat(clientStateFO, is(writeStateFO)); assertEquals(clientStateFO, writeStateFO);
} }
@Test(timeout=300000) @Test(timeout=300000)
@ -230,8 +225,8 @@ private void runClientsWithFailover(int clientStartId,
ExecutorService execService = Executors.newFixedThreadPool(2); ExecutorService execService = Executors.newFixedThreadPool(2);
clients = new ArrayList<>(numClients); clients = new ArrayList<>(numClients);
for (int i = clientStartId; i <= numClients; i++) { for (int i = clientStartId; i <= numClients; i++) {
DistributedFileSystem haClient = DistributedFileSystem haClient = HATestUtil.configureObserverReadFs(
(DistributedFileSystem) FileSystem.get(CONF); cluster, CONF, ORPPwithAlignmentContexts.class, true);
clients.add(new Worker(haClient, numFiles, "/testFile3FO_", i)); clients.add(new Worker(haClient, numFiles, "/testFile3FO_", i));
} }
@ -248,7 +243,7 @@ private void runClientsWithFailover(int clientStartId,
// Validation. // Validation.
for (Future<STATE> future : futures) { for (Future<STATE> future : futures) {
assertThat(future.get(), is(STATE.SUCCESS)); assertEquals(future.get(), STATE.SUCCESS);
} }
clients.clear(); clients.clear();

View File

@ -17,7 +17,10 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode.ha; package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSUtil.createUri; import static org.apache.hadoop.hdfs.DFSUtil.createUri;
@ -29,6 +32,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import com.google.common.base.Function; import com.google.common.base.Function;
@ -164,17 +168,18 @@ public static DistributedFileSystem configureFailoverFs(
return (DistributedFileSystem)fs; return (DistributedFileSystem)fs;
} }
public static DistributedFileSystem configureObserverReadFs( public static <P extends ObserverReadProxyProvider<?>>
DistributedFileSystem configureObserverReadFs(
MiniDFSCluster cluster, Configuration conf, MiniDFSCluster cluster, Configuration conf,
boolean isObserverReadEnabled) Class<P> classFPP, boolean isObserverReadEnabled)
throws IOException, URISyntaxException { throws IOException, URISyntaxException {
conf = new Configuration(conf); conf = new Configuration(conf);
setupHAConfiguration(cluster, conf, 0, ObserverReadProxyProvider.class); setupHAConfiguration(cluster, conf, 0, classFPP);
DistributedFileSystem dfs = (DistributedFileSystem) DistributedFileSystem dfs = (DistributedFileSystem)
FileSystem.get(getLogicalUri(cluster), conf); FileSystem.get(getLogicalUri(cluster), conf);
ObserverReadProxyProvider<?> provider = (ObserverReadProxyProvider<?>) @SuppressWarnings("unchecked")
((RetryInvocationHandler<?>) Proxy.getInvocationHandler( P provider = (P) ((RetryInvocationHandler<?>) Proxy.getInvocationHandler(
dfs.getClient().getNamenode())).getProxyProvider(); dfs.getClient().getNamenode())).getProxyProvider();
provider.setObserverReadEnabled(isObserverReadEnabled); provider.setObserverReadEnabled(isObserverReadEnabled);
return dfs; return dfs;
} }
@ -196,10 +201,25 @@ public static boolean isSentToAnyOfNameNodes(
} }
public static MiniQJMHACluster setUpObserverCluster( public static MiniQJMHACluster setUpObserverCluster(
Configuration conf, int numObservers) throws IOException { Configuration conf, int numObservers, int numDataNodes,
MiniQJMHACluster qjmhaCluster = new MiniQJMHACluster.Builder(conf) boolean fastTailing) throws IOException {
.setNumNameNodes(2 + numObservers) // disable block scanner
.build(); conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, fastTailing);
if(fastTailing) {
conf.setTimeDuration(
DFS_HA_TAILEDITS_PERIOD_KEY, 100, TimeUnit.MILLISECONDS);
} else {
// disable fast tailing so that coordination takes time.
conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 300, TimeUnit.SECONDS);
conf.setTimeDuration(DFS_HA_TAILEDITS_PERIOD_KEY, 200, TimeUnit.SECONDS);
}
MiniQJMHACluster.Builder qjmBuilder = new MiniQJMHACluster.Builder(conf)
.setNumNameNodes(2 + numObservers);
qjmBuilder.getDfsBuilder().numDataNodes(numDataNodes);
MiniQJMHACluster qjmhaCluster = qjmBuilder.build();
MiniDFSCluster dfsCluster = qjmhaCluster.getDfsCluster(); MiniDFSCluster dfsCluster = qjmhaCluster.getDfsCluster();
dfsCluster.transitionToActive(0); dfsCluster.transitionToActive(0);

View File

@ -17,21 +17,16 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode.ha; package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -46,7 +41,6 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* Test consistency of reads while accessing an ObserverNode. * Test consistency of reads while accessing an ObserverNode.
* The tests are based on traditional (non fast path) edits tailing. * The tests are based on traditional (non fast path) edits tailing.
@ -65,19 +59,11 @@ public class TestConsistentReadsObserver {
@BeforeClass @BeforeClass
public static void startUpCluster() throws Exception { public static void startUpCluster() throws Exception {
conf = new Configuration(); conf = new Configuration();
// disable block scanner
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
// disable fast tailing here because this test's assertions are based on the // disable fast tailing here because this test's assertions are based on the
// timing of explicitly called rollEditLogAndTail. Although this means this // timing of explicitly called rollEditLogAndTail. Although this means this
// test takes some time to run // test takes some time to run
// TODO: revisit if there is a better way. // TODO: revisit if there is a better way.
conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false); qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 0, false);
// disable fast tailing so that coordination takes time.
conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 300, TimeUnit.SECONDS);
conf.setTimeDuration(DFS_HA_TAILEDITS_PERIOD_KEY, 200, TimeUnit.SECONDS);
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1);
dfsCluster = qjmhaCluster.getDfsCluster(); dfsCluster = qjmhaCluster.getDfsCluster();
} }
@ -177,6 +163,7 @@ private void assertSentTo(int nnIdx) throws IOException {
} }
private static void setObserverRead(boolean flag) throws Exception { private static void setObserverRead(boolean flag) throws Exception {
dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, flag); dfs = HATestUtil.configureObserverReadFs(
dfsCluster, conf, ObserverReadProxyProvider.class, flag);
} }
} }

View File

@ -17,17 +17,12 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode.ha; package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
@ -50,15 +45,10 @@ public class TestMultiObserverNode {
@BeforeClass @BeforeClass
public static void startUpCluster() throws Exception { public static void startUpCluster() throws Exception {
conf = new Configuration(); conf = new Configuration();
// disable block scanner qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 2, 0, true);
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
conf.setTimeDuration(
DFS_HA_TAILEDITS_PERIOD_KEY, 100, TimeUnit.MILLISECONDS);
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 2);
dfsCluster = qjmhaCluster.getDfsCluster(); dfsCluster = qjmhaCluster.getDfsCluster();
dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, true); dfs = HATestUtil.configureObserverReadFs(
dfsCluster, conf, ObserverReadProxyProvider.class, true);
} }
@After @After

View File

@ -17,8 +17,6 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode.ha; package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState; import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -32,14 +30,11 @@
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -77,13 +72,7 @@ public class TestObserverNode {
@BeforeClass @BeforeClass
public static void startUpCluster() throws Exception { public static void startUpCluster() throws Exception {
conf = new Configuration(); conf = new Configuration();
// disable block scanner qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 0, true);
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
conf.setTimeDuration(
DFS_HA_TAILEDITS_PERIOD_KEY, 100, TimeUnit.MILLISECONDS);
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1);
dfsCluster = qjmhaCluster.getDfsCluster(); dfsCluster = qjmhaCluster.getDfsCluster();
} }
@ -302,6 +291,7 @@ private void assertSentTo(int nnIdx) throws IOException {
} }
private static void setObserverRead(boolean flag) throws Exception { private static void setObserverRead(boolean flag) throws Exception {
dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, flag); dfs = HATestUtil.configureObserverReadFs(
dfsCluster, conf, ObserverReadProxyProvider.class, flag);
} }
} }