From 1c4265d7bcda2409df5df5e990e80ba05fe4b19e Mon Sep 17 00:00:00 2001 From: Konstantin V Shvachko Date: Thu, 11 Oct 2018 22:39:17 -0700 Subject: [PATCH] HDFS-13523. Support observer nodes in MiniDFSCluster. Contributed by Konstantin Shvachko. --- .../hdfs/TestStateAlignmentContextWithHA.java | 61 +++++++++---------- .../hdfs/server/namenode/ha/HATestUtil.java | 40 +++++++++--- .../ha/TestConsistentReadsObserver.java | 19 +----- .../namenode/ha/TestMultiObserverNode.java | 16 +---- .../server/namenode/ha/TestObserverNode.java | 16 +---- 5 files changed, 67 insertions(+), 85 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java index a642872a7a..3dbeea7769 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java @@ -18,14 +18,15 @@ package org.apache.hadoop.hdfs; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider; @@ -66,6 +67,7 @@ public class TestStateAlignmentContextWithHA { private static final Configuration CONF = new HdfsConfiguration(); private static final List AC_LIST = new ArrayList<>(); + private static MiniQJMHACluster qjmhaCluster; private static MiniDFSCluster cluster; private static List clients; @@ -87,33 +89,26 @@ public ORPPwithAlignmentContexts( @BeforeClass public static void startUpCluster() throws IOException { - // disable block scanner - CONF.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); // Set short retry timeouts so this test runs faster CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); - CONF.setBoolean("fs.hdfs.impl.disable.cache", true); + CONF.setBoolean(String.format( + "fs.%s.impl.disable.cache", HdfsConstants.HDFS_URI_SCHEME), true); + CONF.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, NUMDATANODES); - cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES) - .nnTopology(MiniDFSNNTopology.simpleHATopology(3)) - .build(); - cluster.waitActive(); - cluster.transitionToActive(0); - cluster.transitionToObserver(2); - - HATestUtil.setupHAConfiguration( - cluster, CONF, 0, ORPPwithAlignmentContexts.class); + qjmhaCluster = HATestUtil.setUpObserverCluster(CONF, 1, NUMDATANODES, true); + cluster = qjmhaCluster.getDfsCluster(); } @Before public void before() throws IOException, URISyntaxException { - dfs = (DistributedFileSystem) FileSystem.get(CONF); + dfs = HATestUtil.configureObserverReadFs( + cluster, CONF, ORPPwithAlignmentContexts.class, true); } @AfterClass public static void shutDownCluster() throws IOException { - if (cluster != null) { - cluster.shutdown(); - cluster = null; + if (qjmhaCluster != null) { + qjmhaCluster.shutdown(); } } @@ -144,9 +139,9 @@ public void testStateTransferOnWrite() throws Exception { long postWriteState = cluster.getNamesystem(active).getLastWrittenTransactionId(); // Write(s) should have increased state. Check for greater than. - assertThat(clientState > preWriteState, is(true)); + assertTrue(clientState > preWriteState); // Client and server state should be equal. - assertThat(clientState, is(postWriteState)); + assertEquals(clientState, postWriteState); } /** @@ -161,7 +156,7 @@ public void testStateTransferOnRead() throws Exception { DFSTestUtil.readFile(dfs, new Path("/testFile2")); // Read should catch client up to last written state. long clientState = getContext(0).getLastSeenStateId(); - assertThat(clientState, is(lastWrittenId)); + assertEquals(clientState, lastWrittenId); } /** @@ -173,12 +168,12 @@ public void testStateTransferOnFreshClient() throws Exception { DFSTestUtil.writeFile(dfs, new Path("/testFile3"), "ezpz"); long lastWrittenId = cluster.getNamesystem(active).getLastWrittenTransactionId(); - try (DistributedFileSystem clearDfs = - (DistributedFileSystem) FileSystem.get(CONF)) { + try (DistributedFileSystem clearDfs = HATestUtil.configureObserverReadFs( + cluster, CONF, ORPPwithAlignmentContexts.class, true);) { ClientGSIContext clientState = getContext(1); - assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE)); + assertEquals(clientState.getLastSeenStateId(), Long.MIN_VALUE); DFSTestUtil.readFile(clearDfs, new Path("/testFile3")); - assertThat(clientState.getLastSeenStateId(), is(lastWrittenId)); + assertEquals(clientState.getLastSeenStateId(), lastWrittenId); } } @@ -196,9 +191,9 @@ public void testStateTransferOnWriteWithFailover() throws Exception { long postWriteState = cluster.getNamesystem(active).getLastWrittenTransactionId(); // Write(s) should have increased state. Check for greater than. - assertThat(clientState > preWriteState, is(true)); + assertTrue(clientState > preWriteState); // Client and server state should be equal. - assertThat(clientState, is(postWriteState)); + assertEquals(clientState, postWriteState); // Failover NameNode. failOver(); @@ -210,9 +205,9 @@ public void testStateTransferOnWriteWithFailover() throws Exception { cluster.getNamesystem(active).getLastWrittenTransactionId(); // Write(s) should have increased state. Check for greater than. - assertThat(clientStateFO > postWriteState, is(true)); + assertTrue(clientStateFO > postWriteState); // Client and server state should be equal. - assertThat(clientStateFO, is(writeStateFO)); + assertEquals(clientStateFO, writeStateFO); } @Test(timeout=300000) @@ -230,8 +225,8 @@ private void runClientsWithFailover(int clientStartId, ExecutorService execService = Executors.newFixedThreadPool(2); clients = new ArrayList<>(numClients); for (int i = clientStartId; i <= numClients; i++) { - DistributedFileSystem haClient = - (DistributedFileSystem) FileSystem.get(CONF); + DistributedFileSystem haClient = HATestUtil.configureObserverReadFs( + cluster, CONF, ORPPwithAlignmentContexts.class, true); clients.add(new Worker(haClient, numFiles, "/testFile3FO_", i)); } @@ -248,7 +243,7 @@ private void runClientsWithFailover(int clientStartId, // Validation. for (Future future : futures) { - assertThat(future.get(), is(STATE.SUCCESS)); + assertEquals(future.get(), STATE.SUCCESS); } clients.clear(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index f4a766d283..d1095ad2d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -17,7 +17,10 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSUtil.createUri; @@ -29,6 +32,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.google.common.base.Function; @@ -164,17 +168,18 @@ public static DistributedFileSystem configureFailoverFs( return (DistributedFileSystem)fs; } - public static DistributedFileSystem configureObserverReadFs( + public static

> + DistributedFileSystem configureObserverReadFs( MiniDFSCluster cluster, Configuration conf, - boolean isObserverReadEnabled) + Class

classFPP, boolean isObserverReadEnabled) throws IOException, URISyntaxException { conf = new Configuration(conf); - setupHAConfiguration(cluster, conf, 0, ObserverReadProxyProvider.class); + setupHAConfiguration(cluster, conf, 0, classFPP); DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(getLogicalUri(cluster), conf); - ObserverReadProxyProvider provider = (ObserverReadProxyProvider) - ((RetryInvocationHandler) Proxy.getInvocationHandler( - dfs.getClient().getNamenode())).getProxyProvider(); + @SuppressWarnings("unchecked") + P provider = (P) ((RetryInvocationHandler) Proxy.getInvocationHandler( + dfs.getClient().getNamenode())).getProxyProvider(); provider.setObserverReadEnabled(isObserverReadEnabled); return dfs; } @@ -196,10 +201,25 @@ public static boolean isSentToAnyOfNameNodes( } public static MiniQJMHACluster setUpObserverCluster( - Configuration conf, int numObservers) throws IOException { - MiniQJMHACluster qjmhaCluster = new MiniQJMHACluster.Builder(conf) - .setNumNameNodes(2 + numObservers) - .build(); + Configuration conf, int numObservers, int numDataNodes, + boolean fastTailing) throws IOException { + // disable block scanner + conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); + + conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, fastTailing); + if(fastTailing) { + conf.setTimeDuration( + DFS_HA_TAILEDITS_PERIOD_KEY, 100, TimeUnit.MILLISECONDS); + } else { + // disable fast tailing so that coordination takes time. + conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 300, TimeUnit.SECONDS); + conf.setTimeDuration(DFS_HA_TAILEDITS_PERIOD_KEY, 200, TimeUnit.SECONDS); + } + + MiniQJMHACluster.Builder qjmBuilder = new MiniQJMHACluster.Builder(conf) + .setNumNameNodes(2 + numObservers); + qjmBuilder.getDfsBuilder().numDataNodes(numDataNodes); + MiniQJMHACluster qjmhaCluster = qjmBuilder.build(); MiniDFSCluster dfsCluster = qjmhaCluster.getDfsCluster(); dfsCluster.transitionToActive(0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java index 26ad3a2dab..3048842f36 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java @@ -17,21 +17,16 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -46,7 +41,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Test consistency of reads while accessing an ObserverNode. * The tests are based on traditional (non fast path) edits tailing. @@ -65,19 +59,11 @@ public class TestConsistentReadsObserver { @BeforeClass public static void startUpCluster() throws Exception { conf = new Configuration(); - // disable block scanner - conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); // disable fast tailing here because this test's assertions are based on the // timing of explicitly called rollEditLogAndTail. Although this means this // test takes some time to run // TODO: revisit if there is a better way. - conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false); - - // disable fast tailing so that coordination takes time. - conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 300, TimeUnit.SECONDS); - conf.setTimeDuration(DFS_HA_TAILEDITS_PERIOD_KEY, 200, TimeUnit.SECONDS); - - qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1); + qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 0, false); dfsCluster = qjmhaCluster.getDfsCluster(); } @@ -177,6 +163,7 @@ private void assertSentTo(int nnIdx) throws IOException { } private static void setObserverRead(boolean flag) throws Exception { - dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, flag); + dfs = HATestUtil.configureObserverReadFs( + dfsCluster, conf, ObserverReadProxyProvider.class, flag); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java index ab1251e922..4aa3133a0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java @@ -17,17 +17,12 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; @@ -50,15 +45,10 @@ public class TestMultiObserverNode { @BeforeClass public static void startUpCluster() throws Exception { conf = new Configuration(); - // disable block scanner - conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); - conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, true); - conf.setTimeDuration( - DFS_HA_TAILEDITS_PERIOD_KEY, 100, TimeUnit.MILLISECONDS); - - qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 2); + qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 2, 0, true); dfsCluster = qjmhaCluster.getDfsCluster(); - dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, true); + dfs = HATestUtil.configureObserverReadFs( + dfsCluster, conf, ObserverReadProxyProvider.class, true); } @After diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index 2c826e65b1..28fd330be4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY; import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -32,14 +30,11 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; @@ -77,13 +72,7 @@ public class TestObserverNode { @BeforeClass public static void startUpCluster() throws Exception { conf = new Configuration(); - // disable block scanner - conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); - conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, true); - conf.setTimeDuration( - DFS_HA_TAILEDITS_PERIOD_KEY, 100, TimeUnit.MILLISECONDS); - - qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1); + qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 0, true); dfsCluster = qjmhaCluster.getDfsCluster(); } @@ -302,6 +291,7 @@ private void assertSentTo(int nnIdx) throws IOException { } private static void setObserverRead(boolean flag) throws Exception { - dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, flag); + dfs = HATestUtil.configureObserverReadFs( + dfsCluster, conf, ObserverReadProxyProvider.class, flag); } }