diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3042.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3042.txt
new file mode 100644
index 0000000000..7a345c63b7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3042.txt
@@ -0,0 +1,7 @@
+Changes for HDFS-3042 branch.
+
+This change list will be merged into the trunk CHANGES.txt when the HDFS-3042
+branch is merged.
+------------------------------
+
+HDFS-2185. HDFS portion of ZK-based FailoverController (todd)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index 301d302825..350796d331 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -5,6 +5,9 @@
+
+
+
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index b87c59748b..a410cc0bad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -110,6 +110,33 @@
ant
provided
+
+ org.apache.zookeeper
+ zookeeper
+ 3.4.2
+
+
+
+ junit
+ junit
+
+
+ com.sun.jdmk
+ jmxtools
+
+
+ com.sun.jmx
+ jmxri
+
+
+
+
+ org.apache.zookeeper
+ zookeeper
+ 3.4.2
+ test-jar
+ test
+
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
index fb409f3170..1b386b534a 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
@@ -30,6 +30,7 @@ function print_usage(){
echo " namenode -format format the DFS filesystem"
echo " secondarynamenode run the DFS secondary namenode"
echo " namenode run the DFS namenode"
+ echo " zkfc run the ZK Failover Controller daemon"
echo " datanode run a DFS datanode"
echo " dfsadmin run a DFS admin client"
echo " haadmin run a DFS HA admin client"
@@ -71,6 +72,9 @@ fi
if [ "$COMMAND" = "namenode" ] ; then
CLASS='org.apache.hadoop.hdfs.server.namenode.NameNode'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_NAMENODE_OPTS"
+elif [ "$COMMAND" = "zkfc" ] ; then
+ CLASS='org.apache.hadoop.hdfs.tools.DFSZKFailoverController'
+ HADOOP_OPTS="$HADOOP_OPTS $HADOOP_ZKFC_OPTS"
elif [ "$COMMAND" = "secondarynamenode" ] ; then
CLASS='org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_SECONDARYNAMENODE_OPTS"
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java
new file mode 100644
index 0000000000..e04159fb4d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.tools;
+
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceTarget;
+import org.apache.hadoop.ha.ZKFailoverController;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.ha.proto.HAZKInfoProtos.ActiveNodeInfo;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+@InterfaceAudience.Private
+public class DFSZKFailoverController extends ZKFailoverController {
+
+ private static final Log LOG =
+ LogFactory.getLog(DFSZKFailoverController.class);
+ private NNHAServiceTarget localTarget;
+ private Configuration localNNConf;
+
+ @Override
+ protected HAServiceTarget dataToTarget(byte[] data) {
+ ActiveNodeInfo proto;
+ try {
+ proto = ActiveNodeInfo.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException("Invalid data in ZK: " +
+ StringUtils.byteToHexString(data));
+ }
+ NNHAServiceTarget ret = new NNHAServiceTarget(
+ getConf(), proto.getNameserviceId(), proto.getNamenodeId());
+ InetSocketAddress addressFromProtobuf = new InetSocketAddress(
+ proto.getHostname(), proto.getPort());
+
+ if (!addressFromProtobuf.equals(ret.getAddress())) {
+ throw new RuntimeException("Mismatched address stored in ZK for " +
+ ret + ": Stored protobuf was " + proto + ", address from our own " +
+ "configuration for this NameNode was " + ret.getAddress());
+ }
+ return ret;
+ }
+
+ @Override
+ protected byte[] targetToData(HAServiceTarget target) {
+ InetSocketAddress addr = target.getAddress();
+ return ActiveNodeInfo.newBuilder()
+ .setHostname(addr.getHostName())
+ .setPort(addr.getPort())
+ .setNameserviceId(localTarget.getNameServiceId())
+ .setNamenodeId(localTarget.getNameNodeId())
+ .build()
+ .toByteArray();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ // Use HdfsConfiguration here to force hdfs-site.xml to load
+ localNNConf = new HdfsConfiguration(conf);
+
+ String nsId = DFSUtil.getNamenodeNameServiceId(conf);
+
+ if (!HAUtil.isHAEnabled(localNNConf, nsId)) {
+ throw new HadoopIllegalArgumentException(
+ "HA is not enabled for this namenode.");
+ }
+ String nnId = HAUtil.getNameNodeId(localNNConf, nsId);
+ NameNode.initializeGenericKeys(localNNConf, nsId, nnId);
+
+ localTarget = new NNHAServiceTarget(localNNConf, nsId, nnId);
+
+ super.setConf(localNNConf);
+ LOG.info("Failover controller configured for NameNode " +
+ nsId + "." + nnId);
+ }
+
+ @Override
+ public HAServiceTarget getLocalTarget() {
+ Preconditions.checkState(localTarget != null,
+ "setConf() should have already been called");
+ return localTarget;
+ }
+
+ public static void main(String args[])
+ throws Exception {
+ System.exit(ToolRunner.run(
+ new DFSZKFailoverController(), args));
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java
index 9e8c239e7e..5c7748a667 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java
@@ -20,11 +20,11 @@
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.BadFencingConfigurationException;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ha.NodeFencer;
import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.NetUtils;
@@ -38,11 +38,13 @@ public class NNHAServiceTarget extends HAServiceTarget {
private final InetSocketAddress addr;
private NodeFencer fencer;
private BadFencingConfigurationException fenceConfigError;
+ private final String nnId;
+ private final String nsId;
- public NNHAServiceTarget(HdfsConfiguration conf,
+ public NNHAServiceTarget(Configuration localNNConf,
String nsId, String nnId) {
String serviceAddr =
- DFSUtil.getNamenodeServiceAddr(conf, nsId, nnId);
+ DFSUtil.getNamenodeServiceAddr(localNNConf, nsId, nnId);
if (serviceAddr == null) {
throw new IllegalArgumentException(
"Unable to determine service address for namenode '" + nnId + "'");
@@ -50,10 +52,12 @@ public NNHAServiceTarget(HdfsConfiguration conf,
this.addr = NetUtils.createSocketAddr(serviceAddr,
NameNode.DEFAULT_PORT);
try {
- this.fencer = NodeFencer.create(conf);
+ this.fencer = NodeFencer.create(localNNConf);
} catch (BadFencingConfigurationException e) {
this.fenceConfigError = e;
}
+ this.nnId = nnId;
+ this.nsId = nsId;
}
/**
@@ -69,6 +73,10 @@ public void checkFencingConfigured() throws BadFencingConfigurationException {
if (fenceConfigError != null) {
throw fenceConfigError;
}
+ if (fencer == null) {
+ throw new BadFencingConfigurationException(
+ "No fencer configured for " + this);
+ }
}
@Override
@@ -81,4 +89,11 @@ public String toString() {
return "NameNode at " + addr;
}
+ public String getNameServiceId() {
+ return this.nsId;
+ }
+
+ public String getNameNodeId() {
+ return this.nnId;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HAZKInfo.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HAZKInfo.proto
new file mode 100644
index 0000000000..7836dbe5f4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HAZKInfo.proto
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+option java_package = "org.apache.hadoop.hdfs.server.namenode.ha.proto";
+option java_outer_classname = "HAZKInfoProtos";
+
+message ActiveNodeInfo {
+ required string nameserviceId = 1;
+ required string namenodeId = 2;
+
+ required string hostname = 3;
+ required int32 port = 4;
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSZKFailoverController.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSZKFailoverController.java
new file mode 100644
index 0000000000..5b4cfed667
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSZKFailoverController.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.NodeFencer;
+import org.apache.hadoop.ha.ZKFailoverController;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.TestNodeFencer.AlwaysSucceedFencer;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.tools.DFSZKFailoverController;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+import com.google.common.base.Supplier;
+
+public class TestDFSZKFailoverController extends ClientBase {
+ private Configuration conf;
+ private MiniDFSCluster cluster;
+ private TestContext ctx;
+ private ZKFCThread thr1, thr2;
+ private FileSystem fs;
+
+ @Override
+ public void setUp() throws Exception {
+ // build.test.dir is used by zookeeper
+ new File(System.getProperty("build.test.dir", "build")).mkdirs();
+ super.setUp();
+ }
+
+ @Before
+ public void setup() throws Exception {
+ conf = new Configuration();
+ conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort);
+ conf.set(NodeFencer.CONF_METHODS_KEY,
+ AlwaysSucceedFencer.class.getName());
+
+ MiniDFSNNTopology topology = new MiniDFSNNTopology()
+ .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
+ .addNN(new MiniDFSNNTopology.NNConf("nn1").setIpcPort(10001))
+ .addNN(new MiniDFSNNTopology.NNConf("nn2").setIpcPort(10002)));
+ cluster = new MiniDFSCluster.Builder(conf)
+ .nnTopology(topology)
+ .numDataNodes(0)
+ .build();
+ cluster.waitActive();
+
+ ctx = new TestContext();
+ ctx.addThread(thr1 = new ZKFCThread(ctx, 0));
+ assertEquals(0, thr1.zkfc.run(new String[]{"-formatZK"}));
+
+ thr1.start();
+ waitForHAState(0, HAServiceState.ACTIVE);
+
+ ctx.addThread(thr2 = new ZKFCThread(ctx, 1));
+ thr2.start();
+
+ fs = HATestUtil.configureFailoverFs(cluster, conf);
+ }
+
+ @After
+ public void shutdown() throws Exception {
+ cluster.shutdown();
+
+ if (thr1 != null) {
+ thr1.interrupt();
+ }
+ if (thr2 != null) {
+ thr2.interrupt();
+ }
+ if (ctx != null) {
+ ctx.stop();
+ }
+ }
+
+ /**
+ * Test that automatic failover is triggered by shutting the
+ * active NN down.
+ */
+ @Test(timeout=30000)
+ public void testFailoverAndBackOnNNShutdown() throws Exception {
+ Path p1 = new Path("/dir1");
+ Path p2 = new Path("/dir2");
+
+ // Write some data on the first NN
+ fs.mkdirs(p1);
+ // Shut it down, causing automatic failover
+ cluster.shutdownNameNode(0);
+ // Data should still exist. Write some on the new NN
+ assertTrue(fs.exists(p1));
+ fs.mkdirs(p2);
+ assertEquals(AlwaysSucceedFencer.getLastFencedService().getAddress(),
+ thr1.zkfc.getLocalTarget().getAddress());
+
+ // Start the first node back up
+ cluster.restartNameNode(0);
+ // This should have no effect -- the new node should be STANDBY.
+ waitForHAState(0, HAServiceState.STANDBY);
+ assertTrue(fs.exists(p1));
+ assertTrue(fs.exists(p2));
+ // Shut down the second node, which should failback to the first
+ cluster.shutdownNameNode(1);
+ waitForHAState(0, HAServiceState.ACTIVE);
+
+ // First node should see what was written on the second node while it was down.
+ assertTrue(fs.exists(p1));
+ assertTrue(fs.exists(p2));
+ assertEquals(AlwaysSucceedFencer.getLastFencedService().getAddress(),
+ thr2.zkfc.getLocalTarget().getAddress());
+ }
+
+ private void waitForHAState(int nnidx, final HAServiceState state)
+ throws TimeoutException, InterruptedException {
+ final NameNode nn = cluster.getNameNode(nnidx);
+ GenericTestUtils.waitFor(new Supplier() {
+ @Override
+ public Boolean get() {
+ try {
+ return nn.getRpcServer().getServiceStatus().getState() == state;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+ }, 50, 5000);
+ }
+
+ /**
+ * Test-thread which runs a ZK Failover Controller corresponding
+ * to a given NameNode in the minicluster.
+ */
+ private class ZKFCThread extends TestingThread {
+ private final DFSZKFailoverController zkfc;
+
+ public ZKFCThread(TestContext ctx, int idx) {
+ super(ctx);
+ this.zkfc = new DFSZKFailoverController();
+ zkfc.setConf(cluster.getConfiguration(idx));
+ }
+
+ @Override
+ public void doWork() throws Exception {
+ try {
+ assertEquals(0, zkfc.run(new String[0]));
+ } catch (InterruptedException ie) {
+ // Interrupted by main thread, that's OK.
+ }
+ }
+ }
+
+}