HDFS-13961. [SBN read] TestObserverNode refactoring. Contributed by Konstantin Shvachko.

This commit is contained in:
Konstantin V Shvachko 2018-10-05 15:03:38 -07:00
parent a65bb97f5d
commit b5b9b77707
9 changed files with 518 additions and 320 deletions

View File

@ -281,7 +281,7 @@ public Void run() throws Exception {
}
@VisibleForTesting
void doTailEdits() throws IOException, InterruptedException {
public void doTailEdits() throws IOException, InterruptedException {
// Write lock needs to be interruptible here because the
// transitionToActive RPC takes the write lock before calling
// tailer.stop() -- so if we're not interruptible, it will

View File

@ -2653,6 +2653,12 @@ public void transitionToObserver(int nnIndex) throws IOException,
new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER_FORCED));
}
public void rollEditLogAndTail(int nnIndex) throws Exception {
getNameNode(nnIndex).getRpcServer().rollEditLog();
for (int i = 2; i < getNumNameNodes(); i++) {
getNameNode(i).getNamesystem().getEditLogTailer().doTailEdits();
}
}
public void triggerBlockReports()
throws IOException {

View File

@ -100,10 +100,8 @@ public static void startUpCluster() throws IOException {
cluster.transitionToActive(0);
cluster.transitionToObserver(2);
String nameservice = HATestUtil.getLogicalHostname(cluster);
HATestUtil.setFailoverConfigurations(cluster, CONF, nameservice, 0);
CONF.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX +
"." + nameservice, ORPPwithAlignmentContexts.class.getName());
HATestUtil.setupHAConfiguration(
cluster, CONF, 0, ORPPwithAlignmentContexts.class);
}
@Before

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import java.io.IOException;
@ -171,7 +172,8 @@ private Configuration initHAConf(URI journalURI, Builder builder,
}
// use standard failover configurations
HATestUtil.setFailoverConfigurations(conf, NAMESERVICE, nns);
HATestUtil.setFailoverConfigurations(conf, NAMESERVICE, nns,
ConfiguredFailoverProxyProvider.class);
return conf;
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.mockito.Mockito.spy;
@ -176,6 +177,11 @@ public static long getLeaseRenewalTime(NameNode nn, String path) {
return l == null ? -1 : l.getLastUpdate();
}
public static HAServiceState getServiceState(NameNode nn) {
return nn.getServiceState();
}
/**
* Return the datanode descriptor for the given datanode.
*/

View File

@ -19,8 +19,10 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSUtil.createUri;
import java.io.IOException;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
@ -32,6 +34,7 @@
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@ -42,10 +45,12 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryInvocationHandler;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
@ -161,14 +166,63 @@ public static DistributedFileSystem configureFailoverFs(
public static DistributedFileSystem configureObserverReadFs(
MiniDFSCluster cluster, Configuration conf,
int nsIndex) throws IOException, URISyntaxException {
boolean isObserverReadEnabled)
throws IOException, URISyntaxException {
conf = new Configuration(conf);
String logicalName = getLogicalHostname(cluster);
setFailoverConfigurations(cluster, conf, logicalName, nsIndex);
conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." +
logicalName, ObserverReadProxyProvider.class.getName());
FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
return (DistributedFileSystem) fs;
setupHAConfiguration(cluster, conf, 0, ObserverReadProxyProvider.class);
DistributedFileSystem dfs = (DistributedFileSystem)
FileSystem.get(getLogicalUri(cluster), conf);
ObserverReadProxyProvider<?> provider = (ObserverReadProxyProvider<?>)
((RetryInvocationHandler<?>) Proxy.getInvocationHandler(
dfs.getClient().getNamenode())).getProxyProvider();
provider.setObserverReadEnabled(isObserverReadEnabled);
return dfs;
}
public static boolean isSentToAnyOfNameNodes(
DistributedFileSystem dfs,
MiniDFSCluster cluster, int... nnIndices) throws IOException {
ObserverReadProxyProvider<?> provider = (ObserverReadProxyProvider<?>)
((RetryInvocationHandler<?>) Proxy.getInvocationHandler(
dfs.getClient().getNamenode())).getProxyProvider();
FailoverProxyProvider.ProxyInfo<?> pi = provider.getLastProxy();
for (int nnIdx : nnIndices) {
if (pi.proxyInfo.equals(
cluster.getNameNode(nnIdx).getNameNodeAddress().toString())) {
return true;
}
}
return false;
}
public static MiniQJMHACluster setUpObserverCluster(
Configuration conf, int numObservers) throws IOException {
MiniQJMHACluster qjmhaCluster = new MiniQJMHACluster.Builder(conf)
.setNumNameNodes(2 + numObservers)
.build();
MiniDFSCluster dfsCluster = qjmhaCluster.getDfsCluster();
dfsCluster.transitionToActive(0);
dfsCluster.waitActive(0);
for (int i = 0; i < numObservers; i++) {
dfsCluster.transitionToObserver(2 + i);
}
return qjmhaCluster;
}
public static <P extends FailoverProxyProvider<?>>
void setupHAConfiguration(MiniDFSCluster cluster,
Configuration conf, int nsIndex, Class<P> classFPP) {
MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex);
List<String> nnAddresses = new ArrayList<String>();
for (MiniDFSCluster.NameNodeInfo nn : nns) {
InetSocketAddress addr = nn.nameNode.getNameNodeAddress();
nnAddresses.add(
createUri(HdfsConstants.HDFS_URI_SCHEME, addr).toString());
}
setFailoverConfigurations(
conf, getLogicalHostname(cluster), nnAddresses, classFPP);
}
public static void setFailoverConfigurations(MiniDFSCluster cluster,
@ -211,11 +265,13 @@ public static void setFailoverConfigurations(Configuration conf,
public String apply(InetSocketAddress addr) {
return "hdfs://" + addr.getHostName() + ":" + addr.getPort();
}
}));
}), ConfiguredFailoverProxyProvider.class);
}
public static void setFailoverConfigurations(Configuration conf, String logicalName,
Iterable<String> nnAddresses) {
public static <P extends FailoverProxyProvider<?>>
void setFailoverConfigurations(
Configuration conf, String logicalName,
Iterable<String> nnAddresses, Class<P> classFPP) {
List<String> nnids = new ArrayList<String>();
int i = 0;
for (String address : nnAddresses) {
@ -227,8 +283,8 @@ public static void setFailoverConfigurations(Configuration conf, String logicalN
conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName);
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, logicalName),
Joiner.on(',').join(nnids));
conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + logicalName,
ConfiguredFailoverProxyProvider.class.getName());
conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
+ "." + logicalName, classFPP.getName());
conf.set("fs.defaultFS", "hdfs://" + logicalName);
}

View File

@ -0,0 +1,182 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test consistency of reads while accessing an ObserverNode.
* The tests are based on traditional (non fast path) edits tailing.
*/
public class TestConsistentReadsObserver {
public static final Logger LOG =
LoggerFactory.getLogger(TestConsistentReadsObserver.class.getName());
private static Configuration conf;
private static MiniQJMHACluster qjmhaCluster;
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;
private final Path testPath= new Path("/TestConsistentReadsObserver");
@BeforeClass
public static void startUpCluster() throws Exception {
conf = new Configuration();
// disable block scanner
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
// disable fast tailing here because this test's assertions are based on the
// timing of explicitly called rollEditLogAndTail. Although this means this
// test takes some time to run
// TODO: revisit if there is a better way.
conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false);
// disable fast tailing so that coordination takes time.
conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 300, TimeUnit.SECONDS);
conf.setTimeDuration(DFS_HA_TAILEDITS_PERIOD_KEY, 200, TimeUnit.SECONDS);
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1);
dfsCluster = qjmhaCluster.getDfsCluster();
}
@Before
public void setUp() throws Exception {
setObserverRead(true);
}
@After
public void cleanUp() throws IOException {
dfs.delete(testPath, true);
}
@AfterClass
public static void shutDownCluster() throws IOException {
if (qjmhaCluster != null) {
qjmhaCluster.shutdown();
}
}
@Test
public void testMsyncSimple() throws Exception {
// 0 == not completed, 1 == succeeded, -1 == failed
AtomicInteger readStatus = new AtomicInteger(0);
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
Thread reader = new Thread(() -> {
try {
// this read will block until roll and tail edits happen.
dfs.getFileStatus(testPath);
readStatus.set(1);
} catch (IOException e) {
e.printStackTrace();
readStatus.set(-1);
}
});
reader.start();
// the reader is still blocking, not succeeded yet.
assertEquals(0, readStatus.get());
dfsCluster.rollEditLogAndTail(0);
// wait a while for all the change to be done
GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
// the reader should have succeed.
assertEquals(1, readStatus.get());
}
// @Ignore("Move to another test file")
@Test
public void testUncoordinatedCall() throws Exception {
// make a write call so that client will be ahead of
// observer for now.
dfs.mkdir(testPath, FsPermission.getDefault());
// a status flag, initialized to 0, after reader finished, this will be
// updated to 1, -1 on error
AtomicInteger readStatus = new AtomicInteger(0);
// create a separate thread to make a blocking read.
Thread reader = new Thread(() -> {
try {
// this read call will block until server state catches up. But due to
// configuration, this will take a very long time.
dfs.getClient().getFileInfo("/");
readStatus.set(1);
fail("Should have been interrupted before getting here.");
} catch (IOException e) {
e.printStackTrace();
readStatus.set(-1);
}
});
reader.start();
long before = Time.now();
dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.ALL);
long after = Time.now();
// should succeed immediately, because datanodeReport is marked an
// uncoordinated call, and will not be waiting for server to catch up.
assertTrue(after - before < 200);
// by this time, reader thread should still be blocking, so the status not
// updated
assertEquals(0, readStatus.get());
Thread.sleep(5000);
// reader thread status should still be unchanged after 5 sec...
assertEquals(0, readStatus.get());
// and the reader thread is not dead, so it must be still waiting
assertEquals(Thread.State.WAITING, reader.getState());
reader.interrupt();
}
private void assertSentTo(int nnIdx) throws IOException {
assertTrue("Request was not sent to the expected namenode " + nnIdx,
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
}
private static void setObserverRead(boolean flag) throws Exception {
dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, flag);
}
}

View File

@ -0,0 +1,155 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests multiple ObserverNodes.
*/
public class TestMultiObserverNode {
private static Configuration conf;
private static MiniQJMHACluster qjmhaCluster;
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;
private final Path testPath= new Path("/TestMultiObserverNode");
@BeforeClass
public static void startUpCluster() throws Exception {
conf = new Configuration();
// disable block scanner
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
conf.setTimeDuration(
DFS_HA_TAILEDITS_PERIOD_KEY, 100, TimeUnit.MILLISECONDS);
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 2);
dfsCluster = qjmhaCluster.getDfsCluster();
dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, true);
}
@After
public void cleanUp() throws IOException {
dfs.delete(testPath, true);
}
@AfterClass
public static void shutDownCluster() throws IOException {
if (qjmhaCluster != null) {
qjmhaCluster.shutdown();
}
}
@Test
public void testObserverFailover() throws Exception {
dfs.mkdir(testPath, FsPermission.getDefault());
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentTo(2, 3);
// Transition observer #2 to standby, request should go to the #3.
dfsCluster.transitionToStandby(2);
dfs.getFileStatus(testPath);
assertSentTo(3);
// Transition observer #3 to standby, request should go to active
dfsCluster.transitionToStandby(3);
dfs.getFileStatus(testPath);
assertSentTo(0);
// Transition #2 back to observer, request should go to #2
dfsCluster.transitionToObserver(2);
dfs.getFileStatus(testPath);
assertSentTo(2);
// Transition #3 back to observer, request should go to either #2 or #3
dfsCluster.transitionToObserver(3);
dfs.getFileStatus(testPath);
assertSentTo(2, 3);
}
@Test
public void testMultiObserver() throws Exception {
Path testPath2 = new Path(testPath, "test2");
Path testPath3 = new Path(testPath, "test3");
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentTo(2, 3);
dfs.mkdir(testPath2, FsPermission.getDefault());
dfsCluster.rollEditLogAndTail(0);
// Shutdown first observer, request should go to the second one
dfsCluster.shutdownNameNode(2);
dfs.listStatus(testPath2);
assertSentTo(3);
// Restart the first observer
dfsCluster.restartNameNode(2);
dfs.listStatus(testPath);
assertSentTo(3);
dfsCluster.transitionToObserver(2);
dfs.listStatus(testPath);
assertSentTo(2, 3);
dfs.mkdir(testPath3, FsPermission.getDefault());
dfsCluster.rollEditLogAndTail(0);
// Now shutdown the second observer, request should go to the first one
dfsCluster.shutdownNameNode(3);
dfs.listStatus(testPath3);
assertSentTo(2);
// Shutdown both, request should go to active
dfsCluster.shutdownNameNode(2);
dfs.listStatus(testPath3);
assertSentTo(0);
dfsCluster.restartNameNode(2);
dfsCluster.transitionToObserver(2);
dfsCluster.restartNameNode(3);
dfsCluster.transitionToObserver(3);
}
private void assertSentTo(int... nnIndices) throws IOException {
assertTrue("Request was not sent to any of the expected namenodes.",
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIndices));
}
}

View File

@ -17,83 +17,94 @@
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryInvocationHandler;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Proxy;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyShort;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
// Main unit tests for ObserverNode
/**
* Test main functionality of ObserverNode.
*/
public class TestObserverNode {
private Configuration conf;
private MiniQJMHACluster qjmhaCluster;
private MiniDFSCluster dfsCluster;
private NameNode[] namenodes;
private Path testPath;
private Path testPath2;
private Path testPath3;
public static final Logger LOG =
LoggerFactory.getLogger(TestObserverNode.class.getName());
/** These are set in each individual test case */
private DistributedFileSystem dfs;
private ObserverReadProxyProvider<?> provider;
private static Configuration conf;
private static MiniQJMHACluster qjmhaCluster;
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;
@Before
public void setUp() throws Exception {
private final Path testPath= new Path("/TestObserverNode");
@BeforeClass
public static void startUpCluster() throws Exception {
conf = new Configuration();
// disable block scanner
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
conf.setTimeDuration(
DFS_HA_TAILEDITS_PERIOD_KEY, 100, TimeUnit.MILLISECONDS);
testPath = new Path("/test");
testPath2 = new Path("/test2");
testPath3 = new Path("/test3");
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1);
dfsCluster = qjmhaCluster.getDfsCluster();
}
@Before
public void setUp() throws Exception {
setObserverRead(true);
}
@After
public void cleanUp() throws IOException {
dfs.delete(testPath, true);
assertEquals("NN[0] should be active", HAServiceState.ACTIVE,
getServiceState(dfsCluster.getNameNode(0)));
assertEquals("NN[1] should be standby", HAServiceState.STANDBY,
getServiceState(dfsCluster.getNameNode(1)));
assertEquals("NN[2] should be observer", HAServiceState.OBSERVER,
getServiceState(dfsCluster.getNameNode(2)));
}
@AfterClass
public static void shutDownCluster() throws IOException {
if (qjmhaCluster != null) {
qjmhaCluster.shutdown();
}
@ -101,13 +112,12 @@ public void cleanUp() throws IOException {
@Test
public void testSimpleRead() throws Exception {
setUpCluster(1);
setObserverRead(true);
Path testPath2 = new Path(testPath, "test2");
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
rollEditLogAndTail(0);
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentTo(2);
@ -117,7 +127,7 @@ public void testSimpleRead() throws Exception {
@Test
public void testFailover() throws Exception {
setUpCluster(1);
Path testPath2 = new Path(testPath, "test2");
setObserverRead(false);
dfs.mkdir(testPath, FsPermission.getDefault());
@ -127,23 +137,26 @@ public void testFailover() throws Exception {
dfsCluster.transitionToStandby(0);
dfsCluster.transitionToActive(1);
dfsCluster.waitActive();
dfsCluster.waitActive(1);
dfs.mkdir(testPath2, FsPermission.getDefault());
assertSentTo(1);
dfs.getFileStatus(testPath);
assertSentTo(1);
dfsCluster.transitionToStandby(1);
dfsCluster.transitionToActive(0);
dfsCluster.waitActive(0);
}
@Test
public void testDoubleFailover() throws Exception {
setUpCluster(1);
setObserverRead(true);
Path testPath2 = new Path(testPath, "test2");
Path testPath3 = new Path(testPath, "test3");
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
rollEditLogAndTail(0);
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentTo(2);
dfs.mkdir(testPath2, FsPermission.getDefault());
@ -153,7 +166,7 @@ public void testDoubleFailover() throws Exception {
dfsCluster.transitionToActive(1);
dfsCluster.waitActive(1);
rollEditLogAndTail(1);
dfsCluster.rollEditLogAndTail(1);
dfs.getFileStatus(testPath2);
assertSentTo(2);
dfs.mkdir(testPath3, FsPermission.getDefault());
@ -163,51 +176,17 @@ public void testDoubleFailover() throws Exception {
dfsCluster.transitionToActive(0);
dfsCluster.waitActive(0);
rollEditLogAndTail(0);
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath3);
assertSentTo(2);
dfs.delete(testPath3, false);
assertSentTo(0);
}
@Test
public void testObserverFailover() throws Exception {
setUpCluster(2);
setObserverRead(true);
dfs.mkdir(testPath, FsPermission.getDefault());
rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentToAny(2, 3);
// Transition observer #2 to standby, request should go to the #3.
dfsCluster.transitionToStandby(2);
dfs.getFileStatus(testPath);
assertSentTo(3);
// Transition observer #3 to standby, request should go to active
dfsCluster.transitionToStandby(3);
dfs.getFileStatus(testPath);
assertSentTo(0);
// Transition #2 back to observer, request should go to #2
dfsCluster.transitionToObserver(2);
dfs.getFileStatus(testPath);
assertSentTo(2);
// Transition #3 back to observer, request should go to either #2 or #3
dfsCluster.transitionToObserver(3);
dfs.getFileStatus(testPath);
assertSentToAny(2, 3);
}
@Test
public void testObserverShutdown() throws Exception {
setUpCluster(1);
setObserverRead(true);
dfs.mkdir(testPath, FsPermission.getDefault());
rollEditLogAndTail(0);
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentTo(2);
@ -228,18 +207,14 @@ public void testObserverShutdown() throws Exception {
@Test
public void testObserverFailOverAndShutdown() throws Exception {
setUpCluster(1);
// Test the case when there is a failover before ONN shutdown
setObserverRead(true);
dfs.mkdir(testPath, FsPermission.getDefault());
rollEditLogAndTail(0);
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentTo(2);
dfsCluster.transitionToStandby(0);
dfsCluster.transitionToActive(1);
dfsCluster.waitActive();
dfsCluster.waitActive(1);
// Shutdown the observer - requests should go to active
dfsCluster.shutdownNameNode(2);
@ -257,54 +232,14 @@ public void testObserverFailOverAndShutdown() throws Exception {
// the second will properly go to the observer
dfs.getFileStatus(testPath);
assertSentTo(2);
}
@Test
public void testMultiObserver() throws Exception {
setUpCluster(2);
setObserverRead(true);
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentToAny(2, 3);
dfs.mkdir(testPath2, FsPermission.getDefault());
rollEditLogAndTail(0);
// Shutdown first observer, request should go to the second one
dfsCluster.shutdownNameNode(2);
dfs.listStatus(testPath2);
assertSentTo(3);
// Restart the first observer
dfsCluster.restartNameNode(2);
dfs.listStatus(testPath);
assertSentTo(3);
dfsCluster.transitionToObserver(2);
dfs.listStatus(testPath);
assertSentToAny(2, 3);
dfs.mkdir(testPath3, FsPermission.getDefault());
rollEditLogAndTail(0);
// Now shutdown the second observer, request should go to the first one
dfsCluster.shutdownNameNode(3);
dfs.listStatus(testPath3);
assertSentTo(2);
// Shutdown both, request should go to active
dfsCluster.shutdownNameNode(2);
dfs.listStatus(testPath3);
assertSentTo(0);
dfsCluster.transitionToStandby(1);
dfsCluster.transitionToActive(0);
dfsCluster.waitActive(0);
}
@Test
public void testBootstrap() throws Exception {
setUpCluster(1);
for (URI u : dfsCluster.getNameDirs(2)) {
File dir = new File(u.getPath());
assertTrue(FileUtil.fullyDelete(dir));
@ -323,20 +258,12 @@ public void testBootstrap() throws Exception {
*/
@Test
public void testObserverNodeSafeModeWithBlockLocations() throws Exception {
setUpCluster(1);
setObserverRead(true);
// Avoid starting DNs for the mini cluster.
BlockManager bmSpy = NameNodeAdapter.spyOnBlockManager(namenodes[0]);
doNothing().when(bmSpy)
.verifyReplication(anyString(), anyShort(), anyString());
// Create a new file - the request should go to active.
dfs.createNewFile(testPath);
dfs.create(testPath, (short)1).close();
assertSentTo(0);
rollEditLogAndTail(0);
dfs.open(testPath);
dfsCluster.rollEditLogAndTail(0);
dfs.open(testPath).close();
assertSentTo(2);
// Set observer to safe mode.
@ -345,7 +272,8 @@ public void testObserverNodeSafeModeWithBlockLocations() throws Exception {
// Mock block manager for observer to generate some fake blocks which
// will trigger the (retriable) safe mode exception.
final DatanodeInfo[] empty = {};
bmSpy = NameNodeAdapter.spyOnBlockManager(namenodes[2]);
BlockManager bmSpy =
NameNodeAdapter.spyOnBlockManager(dfsCluster.getNameNode(2));
doAnswer((invocation) -> {
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
LocatedBlock fakeBlock = new LocatedBlock(b, empty);
@ -357,158 +285,23 @@ public void testObserverNodeSafeModeWithBlockLocations() throws Exception {
// Open the file again - it should throw retriable exception and then
// failover to active.
dfs.open(testPath);
dfs.open(testPath).close();
assertSentTo(0);
// Remove safe mode on observer, request should still go to it.
dfsCluster.getFileSystem(2).setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
dfs.open(testPath);
dfs.open(testPath).close();
assertSentTo(2);
Mockito.reset(bmSpy);
}
// TODO this does not currently work because fetching the service state from
// e.g. the StandbyNameNode also waits for the transaction ID to catch up.
// This is disabled pending HDFS-13872 and HDFS-13749.
@Ignore("Disabled until HDFS-13872 and HDFS-13749 are committed")
@Test
public void testMsyncSimple() throws Exception {
// disable fast path here because this test's assertions are based on the
// timing of explicitly called rollEditLogAndTail. Although this means this
// test takes some time to run
// TODO: revisit if there is a better way.
conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false);
conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 60, TimeUnit.SECONDS);
conf.setTimeDuration(
DFS_HA_TAILEDITS_PERIOD_KEY, 30, TimeUnit.SECONDS);
setUpCluster(1);
setObserverRead(true);
// 0 == not completed, 1 == succeeded, -1 == failed
AtomicInteger readStatus = new AtomicInteger(0);
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
Thread reader = new Thread(() -> {
try {
// this read will block until roll and tail edits happen.
dfs.getFileStatus(testPath);
readStatus.set(1);
} catch (IOException e) {
e.printStackTrace();
readStatus.set(-1);
}
});
reader.start();
// the reader is still blocking, not succeeded yet.
assertEquals(0, readStatus.get());
rollEditLogAndTail(0);
// wait a while for all the change to be done
GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
// the reader should have succeed.
assertEquals(1, readStatus.get());
private void assertSentTo(int nnIdx) throws IOException {
assertTrue("Request was not sent to the expected namenode " + nnIdx,
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
}
@Test
public void testUncoordinatedCall() throws Exception {
// disable fast tailing so that coordination takes time.
conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false);
conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 300, TimeUnit.SECONDS);
conf.setTimeDuration(
DFS_HA_TAILEDITS_PERIOD_KEY, 200, TimeUnit.SECONDS);
setUpCluster(1);
setObserverRead(true);
// make a write call so that client will be ahead of
// observer for now.
dfs.mkdir(testPath, FsPermission.getDefault());
// a status flag, initialized to 0, after reader finished, this will be
// updated to 1, -1 on error
AtomicInteger readStatus = new AtomicInteger(0);
// create a separate thread to make a blocking read.
Thread reader = new Thread(() -> {
try {
// this read call will block until server state catches up. But due to
// configuration, this will take a very long time.
dfs.getClient().getFileInfo("/");
readStatus.set(1);
fail("Should have been interrupted before getting here.");
} catch (IOException e) {
e.printStackTrace();
readStatus.set(-1);
}
});
reader.start();
long before = System.currentTimeMillis();
dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.ALL);
long after = System.currentTimeMillis();
// should succeed immediately, because datanodeReport is marked an
// uncoordinated call, and will not be waiting for server to catch up.
assertTrue(after - before < 200);
// by this time, reader thread should still be blocking, so the status not
// updated
assertEquals(0, readStatus.get());
Thread.sleep(5000);
// reader thread status should still be unchanged after 5 sec...
assertEquals(0, readStatus.get());
// and the reader thread is not dead, so it must be still waiting
assertEquals(Thread.State.WAITING, reader.getState());
reader.interrupt();
}
private void setUpCluster(int numObservers) throws Exception {
qjmhaCluster = new MiniQJMHACluster.Builder(conf)
.setNumNameNodes(2 + numObservers)
.build();
dfsCluster = qjmhaCluster.getDfsCluster();
namenodes = new NameNode[2 + numObservers];
for (int i = 0; i < namenodes.length; i++) {
namenodes[i] = dfsCluster.getNameNode(i);
}
dfsCluster.transitionToActive(0);
dfsCluster.waitActive(0);
for (int i = 0; i < numObservers; i++) {
dfsCluster.transitionToObserver(2 + i);
}
}
private void assertSentTo(int nnIdx) {
assertSentToAny(nnIdx);
}
private void assertSentToAny(int... nnIndices) {
FailoverProxyProvider.ProxyInfo<?> pi = provider.getLastProxy();
for (int nnIdx : nnIndices) {
if (pi.proxyInfo.equals(
dfsCluster.getNameNode(nnIdx).getNameNodeAddress().toString())) {
return;
}
}
fail("Request was not sent to any of the expected namenodes");
}
private void setObserverRead(boolean flag) throws Exception {
dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, 0);
RetryInvocationHandler<?> handler =
(RetryInvocationHandler<?>) Proxy.getInvocationHandler(
dfs.getClient().getNamenode());
provider = (ObserverReadProxyProvider<?>) handler.getProxyProvider();
provider.setObserverReadEnabled(flag);
}
private void rollEditLogAndTail(int indexForActiveNN) throws Exception {
dfsCluster.getNameNode(indexForActiveNN).getRpcServer().rollEditLog();
for (int i = 2; i < namenodes.length; i++) {
dfsCluster.getNameNode(i).getNamesystem().getEditLogTailer()
.doTailEdits();
}
private static void setObserverRead(boolean flag) throws Exception {
dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, flag);
}
}