diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index 780a0f6718..f4903931d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -281,7 +281,7 @@ public Void run() throws Exception { } @VisibleForTesting - void doTailEdits() throws IOException, InterruptedException { + public void doTailEdits() throws IOException, InterruptedException { // Write lock needs to be interruptible here because the // transitionToActive RPC takes the write lock before calling // tailer.stop() -- so if we're not interruptible, it will diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 514a010992..446b9140d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2652,8 +2652,14 @@ public void transitionToObserver(int nnIndex) throws IOException, getNameNode(nnIndex).getRpcServer().transitionToObserver( new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER_FORCED)); } - - + + public void rollEditLogAndTail(int nnIndex) throws Exception { + getNameNode(nnIndex).getRpcServer().rollEditLog(); + for (int i = 2; i < getNumNameNodes(); i++) { + getNameNode(i).getNamesystem().getEditLogTailer().doTailEdits(); + } + } + public void triggerBlockReports() throws IOException { for (DataNode dn : getDataNodes()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java index a49425260e..a642872a7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java @@ -100,10 +100,8 @@ public static void startUpCluster() throws IOException { cluster.transitionToActive(0); cluster.transitionToObserver(2); - String nameservice = HATestUtil.getLogicalHostname(cluster); - HATestUtil.setFailoverConfigurations(cluster, CONF, nameservice, 0); - CONF.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + - "." + nameservice, ORPPwithAlignmentContexts.class.getName()); + HATestUtil.setupHAConfiguration( + cluster, CONF, 0, ORPPwithAlignmentContexts.class); } @Before diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java index 6a68bd4331..3ece3d7e47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import java.io.IOException; @@ -171,7 +172,8 @@ private Configuration initHAConf(URI journalURI, Builder builder, } // use standard failover configurations - HATestUtil.setFailoverConfigurations(conf, NAMESERVICE, nns); + HATestUtil.setFailoverConfigurations(conf, NAMESERVICE, nns, + ConfiguredFailoverProxyProvider.class); return conf; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index 9a9455415e..ebd5faf502 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.mockito.Mockito.spy; @@ -176,6 +177,11 @@ public static long getLeaseRenewalTime(NameNode nn, String path) { return l == null ? -1 : l.getLastUpdate(); } + + public static HAServiceState getServiceState(NameNode nn) { + return nn.getServiceState(); + } + /** * Return the datanode descriptor for the given datanode. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index cc5b3d4d8b..f4a766d283 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -19,8 +19,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSUtil.createUri; import java.io.IOException; +import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; @@ -32,6 +34,7 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.collect.Iterables; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -42,10 +45,12 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.io.retry.RetryInvocationHandler; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; @@ -158,17 +163,66 @@ public static DistributedFileSystem configureFailoverFs( FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf); return (DistributedFileSystem)fs; } - + public static DistributedFileSystem configureObserverReadFs( MiniDFSCluster cluster, Configuration conf, - int nsIndex) throws IOException, URISyntaxException { + boolean isObserverReadEnabled) + throws IOException, URISyntaxException { conf = new Configuration(conf); - String logicalName = getLogicalHostname(cluster); - setFailoverConfigurations(cluster, conf, logicalName, nsIndex); - conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + - logicalName, ObserverReadProxyProvider.class.getName()); - FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf); - return (DistributedFileSystem) fs; + setupHAConfiguration(cluster, conf, 0, ObserverReadProxyProvider.class); + DistributedFileSystem dfs = (DistributedFileSystem) + FileSystem.get(getLogicalUri(cluster), conf); + ObserverReadProxyProvider provider = (ObserverReadProxyProvider) + ((RetryInvocationHandler) Proxy.getInvocationHandler( + dfs.getClient().getNamenode())).getProxyProvider(); + provider.setObserverReadEnabled(isObserverReadEnabled); + return dfs; + } + + public static boolean isSentToAnyOfNameNodes( + DistributedFileSystem dfs, + MiniDFSCluster cluster, int... nnIndices) throws IOException { + ObserverReadProxyProvider provider = (ObserverReadProxyProvider) + ((RetryInvocationHandler) Proxy.getInvocationHandler( + dfs.getClient().getNamenode())).getProxyProvider(); + FailoverProxyProvider.ProxyInfo pi = provider.getLastProxy(); + for (int nnIdx : nnIndices) { + if (pi.proxyInfo.equals( + cluster.getNameNode(nnIdx).getNameNodeAddress().toString())) { + return true; + } + } + return false; + } + + public static MiniQJMHACluster setUpObserverCluster( + Configuration conf, int numObservers) throws IOException { + MiniQJMHACluster qjmhaCluster = new MiniQJMHACluster.Builder(conf) + .setNumNameNodes(2 + numObservers) + .build(); + MiniDFSCluster dfsCluster = qjmhaCluster.getDfsCluster(); + + dfsCluster.transitionToActive(0); + dfsCluster.waitActive(0); + + for (int i = 0; i < numObservers; i++) { + dfsCluster.transitionToObserver(2 + i); + } + return qjmhaCluster; + } + + public static

> + void setupHAConfiguration(MiniDFSCluster cluster, + Configuration conf, int nsIndex, Class

classFPP) { + MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex); + List nnAddresses = new ArrayList(); + for (MiniDFSCluster.NameNodeInfo nn : nns) { + InetSocketAddress addr = nn.nameNode.getNameNodeAddress(); + nnAddresses.add( + createUri(HdfsConstants.HDFS_URI_SCHEME, addr).toString()); + } + setFailoverConfigurations( + conf, getLogicalHostname(cluster), nnAddresses, classFPP); } public static void setFailoverConfigurations(MiniDFSCluster cluster, @@ -211,11 +265,13 @@ public static void setFailoverConfigurations(Configuration conf, public String apply(InetSocketAddress addr) { return "hdfs://" + addr.getHostName() + ":" + addr.getPort(); } - })); + }), ConfiguredFailoverProxyProvider.class); } - public static void setFailoverConfigurations(Configuration conf, String logicalName, - Iterable nnAddresses) { + public static

> + void setFailoverConfigurations( + Configuration conf, String logicalName, + Iterable nnAddresses, Class

classFPP) { List nnids = new ArrayList(); int i = 0; for (String address : nnAddresses) { @@ -227,8 +283,8 @@ public static void setFailoverConfigurations(Configuration conf, String logicalN conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName); conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, logicalName), Joiner.on(',').join(nnids)); - conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + logicalName, - ConfiguredFailoverProxyProvider.class.getName()); + conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + + "." + logicalName, classFPP.getName()); conf.set("fs.defaultFS", "hdfs://" + logicalName); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java new file mode 100644 index 0000000000..26ad3a2dab --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Test consistency of reads while accessing an ObserverNode. + * The tests are based on traditional (non fast path) edits tailing. + */ +public class TestConsistentReadsObserver { + public static final Logger LOG = + LoggerFactory.getLogger(TestConsistentReadsObserver.class.getName()); + + private static Configuration conf; + private static MiniQJMHACluster qjmhaCluster; + private static MiniDFSCluster dfsCluster; + private static DistributedFileSystem dfs; + + private final Path testPath= new Path("/TestConsistentReadsObserver"); + + @BeforeClass + public static void startUpCluster() throws Exception { + conf = new Configuration(); + // disable block scanner + conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); + // disable fast tailing here because this test's assertions are based on the + // timing of explicitly called rollEditLogAndTail. Although this means this + // test takes some time to run + // TODO: revisit if there is a better way. + conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false); + + // disable fast tailing so that coordination takes time. + conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 300, TimeUnit.SECONDS); + conf.setTimeDuration(DFS_HA_TAILEDITS_PERIOD_KEY, 200, TimeUnit.SECONDS); + + qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1); + dfsCluster = qjmhaCluster.getDfsCluster(); + } + + @Before + public void setUp() throws Exception { + setObserverRead(true); + } + + @After + public void cleanUp() throws IOException { + dfs.delete(testPath, true); + } + + @AfterClass + public static void shutDownCluster() throws IOException { + if (qjmhaCluster != null) { + qjmhaCluster.shutdown(); + } + } + + @Test + public void testMsyncSimple() throws Exception { + // 0 == not completed, 1 == succeeded, -1 == failed + AtomicInteger readStatus = new AtomicInteger(0); + + dfs.mkdir(testPath, FsPermission.getDefault()); + assertSentTo(0); + + Thread reader = new Thread(() -> { + try { + // this read will block until roll and tail edits happen. + dfs.getFileStatus(testPath); + readStatus.set(1); + } catch (IOException e) { + e.printStackTrace(); + readStatus.set(-1); + } + }); + + reader.start(); + // the reader is still blocking, not succeeded yet. + assertEquals(0, readStatus.get()); + dfsCluster.rollEditLogAndTail(0); + // wait a while for all the change to be done + GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000); + // the reader should have succeed. + assertEquals(1, readStatus.get()); + } + + // @Ignore("Move to another test file") + @Test + public void testUncoordinatedCall() throws Exception { + // make a write call so that client will be ahead of + // observer for now. + dfs.mkdir(testPath, FsPermission.getDefault()); + + // a status flag, initialized to 0, after reader finished, this will be + // updated to 1, -1 on error + AtomicInteger readStatus = new AtomicInteger(0); + + // create a separate thread to make a blocking read. + Thread reader = new Thread(() -> { + try { + // this read call will block until server state catches up. But due to + // configuration, this will take a very long time. + dfs.getClient().getFileInfo("/"); + readStatus.set(1); + fail("Should have been interrupted before getting here."); + } catch (IOException e) { + e.printStackTrace(); + readStatus.set(-1); + } + }); + reader.start(); + + long before = Time.now(); + dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.ALL); + long after = Time.now(); + + // should succeed immediately, because datanodeReport is marked an + // uncoordinated call, and will not be waiting for server to catch up. + assertTrue(after - before < 200); + // by this time, reader thread should still be blocking, so the status not + // updated + assertEquals(0, readStatus.get()); + Thread.sleep(5000); + // reader thread status should still be unchanged after 5 sec... + assertEquals(0, readStatus.get()); + // and the reader thread is not dead, so it must be still waiting + assertEquals(Thread.State.WAITING, reader.getState()); + reader.interrupt(); + } + + private void assertSentTo(int nnIdx) throws IOException { + assertTrue("Request was not sent to the expected namenode " + nnIdx, + HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx)); + } + + private static void setObserverRead(boolean flag) throws Exception { + dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, flag); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java new file mode 100644 index 0000000000..ab1251e922 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests multiple ObserverNodes. + */ +public class TestMultiObserverNode { + private static Configuration conf; + private static MiniQJMHACluster qjmhaCluster; + private static MiniDFSCluster dfsCluster; + private static DistributedFileSystem dfs; + + private final Path testPath= new Path("/TestMultiObserverNode"); + + @BeforeClass + public static void startUpCluster() throws Exception { + conf = new Configuration(); + // disable block scanner + conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); + conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, true); + conf.setTimeDuration( + DFS_HA_TAILEDITS_PERIOD_KEY, 100, TimeUnit.MILLISECONDS); + + qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 2); + dfsCluster = qjmhaCluster.getDfsCluster(); + dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, true); + } + + @After + public void cleanUp() throws IOException { + dfs.delete(testPath, true); + } + + @AfterClass + public static void shutDownCluster() throws IOException { + if (qjmhaCluster != null) { + qjmhaCluster.shutdown(); + } + } + + @Test + public void testObserverFailover() throws Exception { + dfs.mkdir(testPath, FsPermission.getDefault()); + dfsCluster.rollEditLogAndTail(0); + dfs.getFileStatus(testPath); + assertSentTo(2, 3); + + // Transition observer #2 to standby, request should go to the #3. + dfsCluster.transitionToStandby(2); + dfs.getFileStatus(testPath); + assertSentTo(3); + + // Transition observer #3 to standby, request should go to active + dfsCluster.transitionToStandby(3); + dfs.getFileStatus(testPath); + assertSentTo(0); + + // Transition #2 back to observer, request should go to #2 + dfsCluster.transitionToObserver(2); + dfs.getFileStatus(testPath); + assertSentTo(2); + + // Transition #3 back to observer, request should go to either #2 or #3 + dfsCluster.transitionToObserver(3); + dfs.getFileStatus(testPath); + assertSentTo(2, 3); + } + + @Test + public void testMultiObserver() throws Exception { + Path testPath2 = new Path(testPath, "test2"); + Path testPath3 = new Path(testPath, "test3"); + dfs.mkdir(testPath, FsPermission.getDefault()); + assertSentTo(0); + + dfsCluster.rollEditLogAndTail(0); + dfs.getFileStatus(testPath); + assertSentTo(2, 3); + + dfs.mkdir(testPath2, FsPermission.getDefault()); + dfsCluster.rollEditLogAndTail(0); + + // Shutdown first observer, request should go to the second one + dfsCluster.shutdownNameNode(2); + dfs.listStatus(testPath2); + assertSentTo(3); + + // Restart the first observer + dfsCluster.restartNameNode(2); + dfs.listStatus(testPath); + assertSentTo(3); + + dfsCluster.transitionToObserver(2); + dfs.listStatus(testPath); + assertSentTo(2, 3); + + dfs.mkdir(testPath3, FsPermission.getDefault()); + dfsCluster.rollEditLogAndTail(0); + + // Now shutdown the second observer, request should go to the first one + dfsCluster.shutdownNameNode(3); + dfs.listStatus(testPath3); + assertSentTo(2); + + // Shutdown both, request should go to active + dfsCluster.shutdownNameNode(2); + dfs.listStatus(testPath3); + assertSentTo(0); + + dfsCluster.restartNameNode(2); + dfsCluster.transitionToObserver(2); + dfsCluster.restartNameNode(3); + dfsCluster.transitionToObserver(3); + } + + private void assertSentTo(int... nnIndices) throws IOException { + assertTrue("Request was not sent to any of the expected namenodes.", + HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIndices)); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index c9e79fa615..2c826e65b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -17,83 +17,94 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY; +import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doAnswer; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; -import org.apache.hadoop.io.retry.FailoverProxyProvider; -import org.apache.hadoop.io.retry.RetryInvocationHandler; -import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; -import org.junit.Ignore; +import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Proxy; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.TimeUnit; - -import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyShort; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; - -// Main unit tests for ObserverNode +/** + * Test main functionality of ObserverNode. + */ public class TestObserverNode { - private Configuration conf; - private MiniQJMHACluster qjmhaCluster; - private MiniDFSCluster dfsCluster; - private NameNode[] namenodes; - private Path testPath; - private Path testPath2; - private Path testPath3; + public static final Logger LOG = + LoggerFactory.getLogger(TestObserverNode.class.getName()); - /** These are set in each individual test case */ - private DistributedFileSystem dfs; - private ObserverReadProxyProvider provider; + private static Configuration conf; + private static MiniQJMHACluster qjmhaCluster; + private static MiniDFSCluster dfsCluster; + private static DistributedFileSystem dfs; - @Before - public void setUp() throws Exception { + private final Path testPath= new Path("/TestObserverNode"); + + @BeforeClass + public static void startUpCluster() throws Exception { conf = new Configuration(); + // disable block scanner + conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, true); conf.setTimeDuration( DFS_HA_TAILEDITS_PERIOD_KEY, 100, TimeUnit.MILLISECONDS); - testPath = new Path("/test"); - testPath2 = new Path("/test2"); - testPath3 = new Path("/test3"); + qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1); + dfsCluster = qjmhaCluster.getDfsCluster(); + } + + @Before + public void setUp() throws Exception { + setObserverRead(true); } @After public void cleanUp() throws IOException { + dfs.delete(testPath, true); + assertEquals("NN[0] should be active", HAServiceState.ACTIVE, + getServiceState(dfsCluster.getNameNode(0))); + assertEquals("NN[1] should be standby", HAServiceState.STANDBY, + getServiceState(dfsCluster.getNameNode(1))); + assertEquals("NN[2] should be observer", HAServiceState.OBSERVER, + getServiceState(dfsCluster.getNameNode(2))); + } + + @AfterClass + public static void shutDownCluster() throws IOException { if (qjmhaCluster != null) { qjmhaCluster.shutdown(); } @@ -101,13 +112,12 @@ public void cleanUp() throws IOException { @Test public void testSimpleRead() throws Exception { - setUpCluster(1); - setObserverRead(true); + Path testPath2 = new Path(testPath, "test2"); dfs.mkdir(testPath, FsPermission.getDefault()); assertSentTo(0); - rollEditLogAndTail(0); + dfsCluster.rollEditLogAndTail(0); dfs.getFileStatus(testPath); assertSentTo(2); @@ -117,7 +127,7 @@ public void testSimpleRead() throws Exception { @Test public void testFailover() throws Exception { - setUpCluster(1); + Path testPath2 = new Path(testPath, "test2"); setObserverRead(false); dfs.mkdir(testPath, FsPermission.getDefault()); @@ -127,23 +137,26 @@ public void testFailover() throws Exception { dfsCluster.transitionToStandby(0); dfsCluster.transitionToActive(1); - dfsCluster.waitActive(); + dfsCluster.waitActive(1); dfs.mkdir(testPath2, FsPermission.getDefault()); assertSentTo(1); dfs.getFileStatus(testPath); assertSentTo(1); + + dfsCluster.transitionToStandby(1); + dfsCluster.transitionToActive(0); + dfsCluster.waitActive(0); } @Test public void testDoubleFailover() throws Exception { - setUpCluster(1); - setObserverRead(true); - + Path testPath2 = new Path(testPath, "test2"); + Path testPath3 = new Path(testPath, "test3"); dfs.mkdir(testPath, FsPermission.getDefault()); assertSentTo(0); - rollEditLogAndTail(0); + dfsCluster.rollEditLogAndTail(0); dfs.getFileStatus(testPath); assertSentTo(2); dfs.mkdir(testPath2, FsPermission.getDefault()); @@ -153,7 +166,7 @@ public void testDoubleFailover() throws Exception { dfsCluster.transitionToActive(1); dfsCluster.waitActive(1); - rollEditLogAndTail(1); + dfsCluster.rollEditLogAndTail(1); dfs.getFileStatus(testPath2); assertSentTo(2); dfs.mkdir(testPath3, FsPermission.getDefault()); @@ -163,51 +176,17 @@ public void testDoubleFailover() throws Exception { dfsCluster.transitionToActive(0); dfsCluster.waitActive(0); - rollEditLogAndTail(0); + dfsCluster.rollEditLogAndTail(0); dfs.getFileStatus(testPath3); assertSentTo(2); dfs.delete(testPath3, false); assertSentTo(0); } - @Test - public void testObserverFailover() throws Exception { - setUpCluster(2); - setObserverRead(true); - - dfs.mkdir(testPath, FsPermission.getDefault()); - rollEditLogAndTail(0); - dfs.getFileStatus(testPath); - assertSentToAny(2, 3); - - // Transition observer #2 to standby, request should go to the #3. - dfsCluster.transitionToStandby(2); - dfs.getFileStatus(testPath); - assertSentTo(3); - - // Transition observer #3 to standby, request should go to active - dfsCluster.transitionToStandby(3); - dfs.getFileStatus(testPath); - assertSentTo(0); - - // Transition #2 back to observer, request should go to #2 - dfsCluster.transitionToObserver(2); - dfs.getFileStatus(testPath); - assertSentTo(2); - - // Transition #3 back to observer, request should go to either #2 or #3 - dfsCluster.transitionToObserver(3); - dfs.getFileStatus(testPath); - assertSentToAny(2, 3); - } - @Test public void testObserverShutdown() throws Exception { - setUpCluster(1); - setObserverRead(true); - dfs.mkdir(testPath, FsPermission.getDefault()); - rollEditLogAndTail(0); + dfsCluster.rollEditLogAndTail(0); dfs.getFileStatus(testPath); assertSentTo(2); @@ -228,18 +207,14 @@ public void testObserverShutdown() throws Exception { @Test public void testObserverFailOverAndShutdown() throws Exception { - setUpCluster(1); - // Test the case when there is a failover before ONN shutdown - setObserverRead(true); - dfs.mkdir(testPath, FsPermission.getDefault()); - rollEditLogAndTail(0); + dfsCluster.rollEditLogAndTail(0); dfs.getFileStatus(testPath); assertSentTo(2); dfsCluster.transitionToStandby(0); dfsCluster.transitionToActive(1); - dfsCluster.waitActive(); + dfsCluster.waitActive(1); // Shutdown the observer - requests should go to active dfsCluster.shutdownNameNode(2); @@ -257,54 +232,14 @@ public void testObserverFailOverAndShutdown() throws Exception { // the second will properly go to the observer dfs.getFileStatus(testPath); assertSentTo(2); - } - @Test - public void testMultiObserver() throws Exception { - setUpCluster(2); - setObserverRead(true); - - dfs.mkdir(testPath, FsPermission.getDefault()); - assertSentTo(0); - - rollEditLogAndTail(0); - dfs.getFileStatus(testPath); - assertSentToAny(2, 3); - - dfs.mkdir(testPath2, FsPermission.getDefault()); - rollEditLogAndTail(0); - - // Shutdown first observer, request should go to the second one - dfsCluster.shutdownNameNode(2); - dfs.listStatus(testPath2); - assertSentTo(3); - - // Restart the first observer - dfsCluster.restartNameNode(2); - dfs.listStatus(testPath); - assertSentTo(3); - - dfsCluster.transitionToObserver(2); - dfs.listStatus(testPath); - assertSentToAny(2, 3); - - dfs.mkdir(testPath3, FsPermission.getDefault()); - rollEditLogAndTail(0); - - // Now shutdown the second observer, request should go to the first one - dfsCluster.shutdownNameNode(3); - dfs.listStatus(testPath3); - assertSentTo(2); - - // Shutdown both, request should go to active - dfsCluster.shutdownNameNode(2); - dfs.listStatus(testPath3); - assertSentTo(0); + dfsCluster.transitionToStandby(1); + dfsCluster.transitionToActive(0); + dfsCluster.waitActive(0); } @Test public void testBootstrap() throws Exception { - setUpCluster(1); for (URI u : dfsCluster.getNameDirs(2)) { File dir = new File(u.getPath()); assertTrue(FileUtil.fullyDelete(dir)); @@ -323,20 +258,12 @@ public void testBootstrap() throws Exception { */ @Test public void testObserverNodeSafeModeWithBlockLocations() throws Exception { - setUpCluster(1); - setObserverRead(true); - - // Avoid starting DNs for the mini cluster. - BlockManager bmSpy = NameNodeAdapter.spyOnBlockManager(namenodes[0]); - doNothing().when(bmSpy) - .verifyReplication(anyString(), anyShort(), anyString()); - // Create a new file - the request should go to active. - dfs.createNewFile(testPath); + dfs.create(testPath, (short)1).close(); assertSentTo(0); - rollEditLogAndTail(0); - dfs.open(testPath); + dfsCluster.rollEditLogAndTail(0); + dfs.open(testPath).close(); assertSentTo(2); // Set observer to safe mode. @@ -345,7 +272,8 @@ public void testObserverNodeSafeModeWithBlockLocations() throws Exception { // Mock block manager for observer to generate some fake blocks which // will trigger the (retriable) safe mode exception. final DatanodeInfo[] empty = {}; - bmSpy = NameNodeAdapter.spyOnBlockManager(namenodes[2]); + BlockManager bmSpy = + NameNodeAdapter.spyOnBlockManager(dfsCluster.getNameNode(2)); doAnswer((invocation) -> { ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L)); LocatedBlock fakeBlock = new LocatedBlock(b, empty); @@ -357,158 +285,23 @@ public void testObserverNodeSafeModeWithBlockLocations() throws Exception { // Open the file again - it should throw retriable exception and then // failover to active. - dfs.open(testPath); + dfs.open(testPath).close(); assertSentTo(0); // Remove safe mode on observer, request should still go to it. dfsCluster.getFileSystem(2).setSafeMode(SafeModeAction.SAFEMODE_LEAVE); - dfs.open(testPath); + dfs.open(testPath).close(); assertSentTo(2); + + Mockito.reset(bmSpy); } - // TODO this does not currently work because fetching the service state from - // e.g. the StandbyNameNode also waits for the transaction ID to catch up. - // This is disabled pending HDFS-13872 and HDFS-13749. - @Ignore("Disabled until HDFS-13872 and HDFS-13749 are committed") - @Test - public void testMsyncSimple() throws Exception { - // disable fast path here because this test's assertions are based on the - // timing of explicitly called rollEditLogAndTail. Although this means this - // test takes some time to run - // TODO: revisit if there is a better way. - conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false); - conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 60, TimeUnit.SECONDS); - conf.setTimeDuration( - DFS_HA_TAILEDITS_PERIOD_KEY, 30, TimeUnit.SECONDS); - setUpCluster(1); - setObserverRead(true); - - // 0 == not completed, 1 == succeeded, -1 == failed - AtomicInteger readStatus = new AtomicInteger(0); - - dfs.mkdir(testPath, FsPermission.getDefault()); - assertSentTo(0); - - Thread reader = new Thread(() -> { - try { - // this read will block until roll and tail edits happen. - dfs.getFileStatus(testPath); - readStatus.set(1); - } catch (IOException e) { - e.printStackTrace(); - readStatus.set(-1); - } - }); - - reader.start(); - // the reader is still blocking, not succeeded yet. - assertEquals(0, readStatus.get()); - rollEditLogAndTail(0); - // wait a while for all the change to be done - GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000); - // the reader should have succeed. - assertEquals(1, readStatus.get()); + private void assertSentTo(int nnIdx) throws IOException { + assertTrue("Request was not sent to the expected namenode " + nnIdx, + HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx)); } - @Test - public void testUncoordinatedCall() throws Exception { - // disable fast tailing so that coordination takes time. - conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false); - conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 300, TimeUnit.SECONDS); - conf.setTimeDuration( - DFS_HA_TAILEDITS_PERIOD_KEY, 200, TimeUnit.SECONDS); - setUpCluster(1); - setObserverRead(true); - - // make a write call so that client will be ahead of - // observer for now. - dfs.mkdir(testPath, FsPermission.getDefault()); - - // a status flag, initialized to 0, after reader finished, this will be - // updated to 1, -1 on error - AtomicInteger readStatus = new AtomicInteger(0); - - // create a separate thread to make a blocking read. - Thread reader = new Thread(() -> { - try { - // this read call will block until server state catches up. But due to - // configuration, this will take a very long time. - dfs.getClient().getFileInfo("/"); - readStatus.set(1); - fail("Should have been interrupted before getting here."); - } catch (IOException e) { - e.printStackTrace(); - readStatus.set(-1); - } - }); - reader.start(); - - long before = System.currentTimeMillis(); - dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.ALL); - long after = System.currentTimeMillis(); - - // should succeed immediately, because datanodeReport is marked an - // uncoordinated call, and will not be waiting for server to catch up. - assertTrue(after - before < 200); - // by this time, reader thread should still be blocking, so the status not - // updated - assertEquals(0, readStatus.get()); - Thread.sleep(5000); - // reader thread status should still be unchanged after 5 sec... - assertEquals(0, readStatus.get()); - // and the reader thread is not dead, so it must be still waiting - assertEquals(Thread.State.WAITING, reader.getState()); - reader.interrupt(); - } - - private void setUpCluster(int numObservers) throws Exception { - qjmhaCluster = new MiniQJMHACluster.Builder(conf) - .setNumNameNodes(2 + numObservers) - .build(); - dfsCluster = qjmhaCluster.getDfsCluster(); - - namenodes = new NameNode[2 + numObservers]; - for (int i = 0; i < namenodes.length; i++) { - namenodes[i] = dfsCluster.getNameNode(i); - } - - dfsCluster.transitionToActive(0); - dfsCluster.waitActive(0); - - for (int i = 0; i < numObservers; i++) { - dfsCluster.transitionToObserver(2 + i); - } - } - - private void assertSentTo(int nnIdx) { - assertSentToAny(nnIdx); - } - - private void assertSentToAny(int... nnIndices) { - FailoverProxyProvider.ProxyInfo pi = provider.getLastProxy(); - for (int nnIdx : nnIndices) { - if (pi.proxyInfo.equals( - dfsCluster.getNameNode(nnIdx).getNameNodeAddress().toString())) { - return; - } - } - fail("Request was not sent to any of the expected namenodes"); - } - - private void setObserverRead(boolean flag) throws Exception { - dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, 0); - RetryInvocationHandler handler = - (RetryInvocationHandler) Proxy.getInvocationHandler( - dfs.getClient().getNamenode()); - provider = (ObserverReadProxyProvider) handler.getProxyProvider(); - provider.setObserverReadEnabled(flag); - } - - private void rollEditLogAndTail(int indexForActiveNN) throws Exception { - dfsCluster.getNameNode(indexForActiveNN).getRpcServer().rollEditLog(); - for (int i = 2; i < namenodes.length; i++) { - dfsCluster.getNameNode(i).getNamesystem().getEditLogTailer() - .doTailEdits(); - } + private static void setObserverRead(boolean flag) throws Exception { + dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, flag); } }