diff --git a/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt b/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt index 1663eee16c..107572ae49 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt @@ -24,3 +24,5 @@ HADOOP-7925. Add interface and update CLI to query current state to HADOOP-7932. Make client connection retries on socket time outs configurable. (Uma Maheswara Rao G via todd) + +HADOOP-7924. 
FailoverController for client-based configuration (eli) diff --git a/hadoop-common-project/hadoop-common/src/main/bin/hadoop b/hadoop-common-project/hadoop-common/src/main/bin/hadoop index 4ca78972aa..e57ea31fbd 100755 --- a/hadoop-common-project/hadoop-common/src/main/bin/hadoop +++ b/hadoop-common-project/hadoop-common/src/main/bin/hadoop @@ -36,6 +36,7 @@ function print_usage(){ echo " classpath prints the class path needed to get the" echo " Hadoop jar and the required libraries" echo " daemonlog get/set the log level for each daemon" + echo " haadmin run a HA admin client" echo " or" echo " CLASSNAME run the class named CLASSNAME" echo "" @@ -95,6 +96,10 @@ case $COMMAND in CLASS=org.apache.hadoop.tools.HadoopArchives CLASSPATH=${CLASSPATH}:${TOOL_PATH} HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS" + elif [ "$COMMAND" = "haadmin" ] ; then + CLASS=org.apache.hadoop.ha.HAAdmin + CLASSPATH=${CLASSPATH}:${TOOL_PATH} + HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS" elif [[ "$COMMAND" = -* ]] ; then # class and package names cannot begin with a - echo "Error: No command named \`$COMMAND' was found. Perhaps you meant \`hadoop ${COMMAND#-}'" diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java new file mode 100644 index 0000000000..cc60de66a3 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ha; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; + +/** + * The FailOverController is responsible for electing an active service + * on startup or when the current active is changing (eg due to failure), + * monitoring the health of a service, and performing a fail-over when a + * new active service is either manually selected by a user or elected. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FailoverController { + + private static final Log LOG = LogFactory.getLog(FailoverController.class); + + /** + * Perform pre-failover checks on the given service we plan to + * failover to, eg to prevent failing over to a service (eg due + * to it being inaccessible, already active, not healthy, etc). + * + * @param toSvc service to make active + * @param toSvcName name of service to make active + * @throws FailoverFailedException if we should avoid failover + */ + private static void preFailoverChecks(HAServiceProtocol toSvc, + String toSvcName) + throws FailoverFailedException { + HAServiceState toSvcState; + try { + toSvcState = toSvc.getServiceState(); + } catch (Exception e) { + String msg = "Unable to get service state for " + toSvcName; + LOG.error(msg, e); + throw new FailoverFailedException(msg, e); + } + if (!toSvcState.equals(HAServiceState.STANDBY)) { + throw new FailoverFailedException( + "Can't failover to an active service"); + } + try { + toSvc.monitorHealth(); + } catch (HealthCheckFailedException hce) { + throw new FailoverFailedException( + "Can't failover to an unhealthy service", hce); + } + // TODO(HA): ask toSvc if it's capable. Eg not in SM. + } + + /** + * Failover from service 1 to service 2. If the failover fails + * then try to failback. + * + * @param fromSvc currently active service + * @param fromSvcName name of currently active service + * @param toSvc service to make active + * @param toSvcName name of service to make active + * @throws FailoverFailedException if the failover fails + */ + public static void failover(HAServiceProtocol fromSvc, String fromSvcName, + HAServiceProtocol toSvc, String toSvcName) + throws FailoverFailedException { + preFailoverChecks(toSvc, toSvcName); + + // Try to make fromSvc standby + try { + fromSvc.transitionToStandby(); + } catch (ServiceFailedException sfe) { + LOG.warn("Unable to make " + fromSvcName + " standby (" + + sfe.getMessage() + ")"); + } catch (Exception e) { + LOG.warn("Unable to make " + fromSvcName + + " standby (unable to connect)", e); + // TODO(HA): fence fromSvc and unfence on failback + } + + // Try to make toSvc active + boolean failed = false; + Throwable cause = null; + try { + toSvc.transitionToActive(); + } catch (ServiceFailedException sfe) { + LOG.error("Unable to make " + toSvcName + " active (" + + sfe.getMessage() + "). Failing back"); + failed = true; + cause = sfe; + } catch (Exception e) { + LOG.error("Unable to make " + toSvcName + + " active (unable to connect). Failing back", e); + failed = true; + cause = e; + } + + // Try to failback if we failed to make toSvc active + if (failed) { + String msg = "Unable to failover to " + toSvcName; + try { + fromSvc.transitionToActive(); + } catch (ServiceFailedException sfe) { + msg = "Failback to " + fromSvcName + " failed (" + + sfe.getMessage() + ")"; + LOG.fatal(msg); + } catch (Exception e) { + msg = "Failback to " + fromSvcName + " failed (unable to connect)"; + LOG.fatal(msg); + } + throw new FailoverFailedException(msg, cause); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverFailedException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverFailedException.java new file mode 100644 index 0000000000..09982b4f7e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverFailedException.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ha; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Exception thrown to indicate service failover has failed. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class FailoverFailedException extends Exception { + private static final long serialVersionUID = 1L; + + public FailoverFailedException(final String message) { + super(message); + } + + public FailoverFailedException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java index fff82e83b4..2dc5c1f39a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java @@ -24,7 +24,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.Tool; @@ -46,6 +45,9 @@ public class HAAdmin extends Configured implements Tool { new UsageInfo("", "Transitions the daemon into Active state")) .put("-transitionToStandby", new UsageInfo("", "Transitions the daemon into Standby state")) + .put("-failover", + new UsageInfo(" ", + "Failover from the first daemon to the second")) .put("-getServiceState", new UsageInfo("", "Returns the state of the daemon")) .put("-checkHealth", @@ -94,7 +96,6 @@ private int transitionToActive(final String[] argv) return 0; } - private int transitionToStandby(final String[] argv) throws IOException, ServiceFailedException { if (argv.length != 2) { @@ -107,7 +108,27 @@ private int transitionToStandby(final String[] argv) proto.transitionToStandby(); return 0; } - + + private int failover(final String[] argv) + throws IOException, ServiceFailedException { + if (argv.length != 3) { + errOut.println("failover: incorrect number of arguments"); + printUsage(errOut, "-failover"); + return -1; + } + + HAServiceProtocol proto1 = getProtocol(argv[1]); + HAServiceProtocol proto2 = getProtocol(argv[2]); + try { + FailoverController.failover(proto1, argv[1], proto2, argv[2]); + out.println("Failover from "+argv[1]+" to "+argv[2]+" successful"); + } catch (FailoverFailedException ffe) { + errOut.println("Failover failed: " + ffe.getLocalizedMessage()); + return 1; + } + return 0; + } + private int checkHealth(final String[] argv) throws IOException, ServiceFailedException { if (argv.length != 2) { @@ -171,6 +192,8 @@ public int run(String[] argv) throws Exception { return transitionToActive(argv); } else if ("-transitionToStandby".equals(cmd)) { return transitionToStandby(argv); + } else if ("-failover".equals(cmd)) { + return failover(argv); } else if ("-getServiceState".equals(cmd)) { return getServiceState(argv); } else if ("-checkHealth".equals(cmd)) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthCheckFailedException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthCheckFailedException.java index a73e4ef3c2..4d888be480 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthCheckFailedException.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthCheckFailedException.java @@ -21,34 +21,17 @@ import org.apache.hadoop.classification.InterfaceStability; /** - * Exception thrown to indicate that health check of a service - * failed. + * Exception thrown to indicate that health check of a service failed. */ @InterfaceAudience.Public @InterfaceStability.Evolving public class HealthCheckFailedException extends Exception { private static final long serialVersionUID = 1L; - /** - * Constructs exception with the specified detail message. - * @param message the detail message (which is saved for later retrieval - * by the {@link #getMessage()} method). - */ public HealthCheckFailedException(final String message) { super(message); } - /** - * Constructs a new exception with the specified detail message and - * cause. - * - * @param message the detail message (which is saved for later retrieval - * by the {@link #getMessage()} method). - * @param cause the cause (which is saved for later retrieval by the - * {@link #getCause()} method). (A null value is - * permitted, and indicates that the cause is nonexistent or - * unknown.) - */ public HealthCheckFailedException(String message, Throwable cause) { super(message, cause); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ServiceFailedException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ServiceFailedException.java index e0f8cfc837..788a843073 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ServiceFailedException.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ServiceFailedException.java @@ -30,27 +30,11 @@ public class ServiceFailedException extends Exception { private static final long serialVersionUID = 1L; - /** - * Constructs exception with the specified detail message. - * @param message the detail message (which is saved for later retrieval - * by the {@link #getMessage()} method). - */ public ServiceFailedException(final String message) { super(message); } - /** - * Constructs a new exception with the specified detail message and - * cause. - * - * @param message the detail message (which is saved for later retrieval - * by the {@link #getMessage()} method). - * @param cause the cause (which is saved for later retrieval by the - * {@link #getCause()} method). (A null value is - * permitted, and indicates that the cause is nonexistent or - * unknown.) - */ public ServiceFailedException(String message, Throwable cause) { - super(message, cause); + super(message, cause); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestFailoverController.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestFailoverController.java new file mode 100644 index 0000000000..f4a6ff2427 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestFailoverController.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ha; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class TestFailoverController { + + private class DummyService implements HAServiceProtocol { + HAServiceState state; + + DummyService(HAServiceState state) { + this.state = state; + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return 0; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + return null; + } + + @Override + public void monitorHealth() throws HealthCheckFailedException { + // Do nothing + } + + @Override + public void transitionToActive() throws ServiceFailedException { + state = HAServiceState.ACTIVE; + } + + @Override + public void transitionToStandby() throws ServiceFailedException { + state = HAServiceState.STANDBY; + } + + @Override + public HAServiceState getServiceState() { + return state; + } + } + + @Test + public void testFailoverAndFailback() throws Exception { + DummyService svc1 = new DummyService(HAServiceState.ACTIVE); + DummyService svc2 = new DummyService(HAServiceState.STANDBY); + + FailoverController.failover(svc1, "svc1", svc2, "svc2"); + assertEquals(HAServiceState.STANDBY, svc1.getServiceState()); + assertEquals(HAServiceState.ACTIVE, svc2.getServiceState()); + + FailoverController.failover(svc2, "svc2", svc1, "svc1"); + assertEquals(HAServiceState.ACTIVE, svc1.getServiceState()); + assertEquals(HAServiceState.STANDBY, svc2.getServiceState()); + } + + @Test + public void testFailoverFromStandbyToStandby() throws Exception { + DummyService svc1 = new DummyService(HAServiceState.STANDBY); + DummyService svc2 = new DummyService(HAServiceState.STANDBY); + + FailoverController.failover(svc1, "svc1", svc2, "svc2"); + assertEquals(HAServiceState.STANDBY, svc1.getServiceState()); + assertEquals(HAServiceState.ACTIVE, svc2.getServiceState()); + } + + @Test + public void testFailoverFromActiveToActive() throws Exception { + DummyService svc1 = new DummyService(HAServiceState.ACTIVE); + DummyService svc2 = new DummyService(HAServiceState.ACTIVE); + + try { + FailoverController.failover(svc1, "svc1", svc2, "svc2"); + fail("Can't failover to an already active service"); + } catch (FailoverFailedException ffe) { + // Expected + } + + assertEquals(HAServiceState.ACTIVE, svc1.getServiceState()); + assertEquals(HAServiceState.ACTIVE, svc2.getServiceState()); + } + + @Test + public void testFailoverToUnhealthyServiceFails() throws Exception { + DummyService svc1 = new DummyService(HAServiceState.ACTIVE); + DummyService svc2 = new DummyService(HAServiceState.STANDBY) { + @Override + public void monitorHealth() throws HealthCheckFailedException { + throw new HealthCheckFailedException("Failed!"); + } + }; + + try { + FailoverController.failover(svc1, "svc1", svc2, "svc2"); + fail("Failover to unhealthy service"); + } catch (FailoverFailedException ffe) { + // Expected + } + assertEquals(HAServiceState.ACTIVE, svc1.getServiceState()); + assertEquals(HAServiceState.STANDBY, svc2.getServiceState()); + } + + @Test + public void testFailoverFromFaultyServiceSucceeds() throws Exception { + DummyService svc1 = new DummyService(HAServiceState.ACTIVE) { + @Override + public void transitionToStandby() throws ServiceFailedException { + throw new ServiceFailedException("Failed!"); + } + }; + DummyService svc2 = new DummyService(HAServiceState.STANDBY); + + try { + FailoverController.failover(svc1, "svc1", svc2, "svc2"); + } catch (FailoverFailedException ffe) { + fail("Faulty active prevented failover"); + } + // svc1 still thinks they're active, that's OK, we'll fence them + assertEquals(HAServiceState.ACTIVE, svc1.getServiceState()); + assertEquals(HAServiceState.ACTIVE, svc2.getServiceState()); + } + + private HAServiceProtocol getProtocol(String target) + throws IOException { + InetSocketAddress addr = NetUtils.createSocketAddr(target); + Configuration conf = new Configuration(); + // Lower the timeout so we quickly fail to connect + conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); + return (HAServiceProtocol)RPC.getProxy( + HAServiceProtocol.class, HAServiceProtocol.versionID, addr, conf); + } + + @Test + public void testFailoverFromNonExistantServiceSucceeds() throws Exception { + HAServiceProtocol svc1 = getProtocol("localhost:1234"); + DummyService svc2 = new DummyService(HAServiceState.STANDBY); + + try { + FailoverController.failover(svc1, "svc1", svc2, "svc2"); + } catch (FailoverFailedException ffe) { + fail("Non-existant active prevented failover"); + } + + // Don't check svc1 (we can't reach it, but that's OK, we'll fence) + assertEquals(HAServiceState.ACTIVE, svc2.getServiceState()); + } + + @Test + public void testFailoverToNonExistantServiceFails() throws Exception { + DummyService svc1 = new DummyService(HAServiceState.ACTIVE); + HAServiceProtocol svc2 = getProtocol("localhost:1234"); + + try { + FailoverController.failover(svc1, "svc1", svc2, "svc2"); + fail("Failed over to a non-existant standby"); + } catch (FailoverFailedException ffe) { + // Expected + } + + assertEquals(HAServiceState.ACTIVE, svc1.getServiceState()); + } + + @Test + public void testFailoverToFaultyServiceFailsbackOK() throws Exception { + DummyService svc1 = spy(new DummyService(HAServiceState.ACTIVE)); + DummyService svc2 = new DummyService(HAServiceState.STANDBY) { + @Override + public void transitionToActive() throws ServiceFailedException { + throw new ServiceFailedException("Failed!"); + } + }; + + try { + FailoverController.failover(svc1, "svc1", svc2, "svc2"); + fail("Failover to already active service"); + } catch (FailoverFailedException ffe) { + // Expected + } + + // svc1 went standby then back to active + verify(svc1).transitionToStandby(); + verify(svc1).transitionToActive(); + assertEquals(HAServiceState.ACTIVE, svc1.getServiceState()); + assertEquals(HAServiceState.STANDBY, svc2.getServiceState()); + } + + @Test + public void testFailbackToFaultyServiceFails() throws Exception { + DummyService svc1 = new DummyService(HAServiceState.ACTIVE) { + @Override + public void transitionToActive() throws ServiceFailedException { + throw new ServiceFailedException("Failed!"); + } + }; + DummyService svc2 = new DummyService(HAServiceState.STANDBY) { + @Override + public void transitionToActive() throws ServiceFailedException { + throw new ServiceFailedException("Failed!"); + } + }; + + try { + FailoverController.failover(svc1, "svc1", svc2, "svc2"); + fail("Failover to already active service"); + } catch (FailoverFailedException ffe) { + // Expected + } + + assertEquals(HAServiceState.STANDBY, svc1.getServiceState()); + assertEquals(HAServiceState.STANDBY, svc2.getServiceState()); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHAAdmin.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHAAdmin.java index b465029d47..ca3d9eccaf 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHAAdmin.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHAAdmin.java @@ -100,6 +100,11 @@ public void testTransitionToStandby() throws Exception { Mockito.verify(mockProtocol).transitionToStandby(); } + @Test + public void testFailover() throws Exception { + assertEquals(0, runTool("-failover", "xxx", "yyy")); + } + @Test public void testGetServiceState() throws Exception { assertEquals(0, runTool("-getServiceState", "xxx"));