HDFS-2808. HA: haadmin should use namenode ids. Contributed by Eli Collins
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1240600 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ec6961b39c
commit
7b913180be
@ -40,7 +40,7 @@ import com.google.common.collect.ImmutableMap;
|
||||
|
||||
/**
|
||||
* A command-line tool for making calls in the HAServiceProtocol.
|
||||
* For example,. this can be used to force a daemon to standby or active
|
||||
* For example,. this can be used to force a service to standby or active
|
||||
* mode, or to trigger a health-check.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@ -53,20 +53,20 @@ public abstract class HAAdmin extends Configured implements Tool {
|
||||
private static Map<String, UsageInfo> USAGE =
|
||||
ImmutableMap.<String, UsageInfo>builder()
|
||||
.put("-transitionToActive",
|
||||
new UsageInfo("<host:port>", "Transitions the daemon into Active state"))
|
||||
new UsageInfo("<serviceId>", "Transitions the service into Active state"))
|
||||
.put("-transitionToStandby",
|
||||
new UsageInfo("<host:port>", "Transitions the daemon into Standby state"))
|
||||
new UsageInfo("<serviceId>", "Transitions the service into Standby state"))
|
||||
.put("-failover",
|
||||
new UsageInfo("[--"+FORCEFENCE+"] [--"+FORCEACTIVE+"] <host:port> <host:port>",
|
||||
"Failover from the first daemon to the second.\n" +
|
||||
new UsageInfo("[--"+FORCEFENCE+"] [--"+FORCEACTIVE+"] <serviceId> <serviceId>",
|
||||
"Failover from the first service to the second.\n" +
|
||||
"Unconditionally fence services if the "+FORCEFENCE+" option is used.\n" +
|
||||
"Try to failover to the target service even if it is not ready if the " +
|
||||
FORCEACTIVE + " option is used."))
|
||||
.put("-getServiceState",
|
||||
new UsageInfo("<host:port>", "Returns the state of the daemon"))
|
||||
new UsageInfo("<serviceId>", "Returns the state of the service"))
|
||||
.put("-checkHealth",
|
||||
new UsageInfo("<host:port>",
|
||||
"Requests that the daemon perform a health check.\n" +
|
||||
new UsageInfo("<serviceId>",
|
||||
"Requests that the service perform a health check.\n" +
|
||||
"The HAAdmin tool will exit with a non-zero exit code\n" +
|
||||
"if the check fails."))
|
||||
.put("-help",
|
||||
@ -74,11 +74,15 @@ public abstract class HAAdmin extends Configured implements Tool {
|
||||
.build();
|
||||
|
||||
/** Output stream for errors, for use in tests */
|
||||
PrintStream errOut = System.err;
|
||||
protected PrintStream errOut = System.err;
|
||||
PrintStream out = System.out;
|
||||
|
||||
private static void printUsage(PrintStream errOut) {
|
||||
errOut.println("Usage: java HAAdmin");
|
||||
protected String getUsageString() {
|
||||
return "Usage: HAAdmin";
|
||||
}
|
||||
|
||||
protected void printUsage(PrintStream errOut) {
|
||||
errOut.println(getUsageString());
|
||||
for (Map.Entry<String, UsageInfo> e : USAGE.entrySet()) {
|
||||
String cmd = e.getKey();
|
||||
UsageInfo usage = e.getValue();
|
||||
@ -94,7 +98,7 @@ public abstract class HAAdmin extends Configured implements Tool {
|
||||
if (usage == null) {
|
||||
throw new RuntimeException("No usage for cmd " + cmd);
|
||||
}
|
||||
errOut.println("Usage: java HAAdmin [" + cmd + " " + usage.args + "]");
|
||||
errOut.println("Usage: HAAdmin [" + cmd + " " + usage.args + "]");
|
||||
}
|
||||
|
||||
private int transitionToActive(final String[] argv)
|
||||
@ -171,8 +175,10 @@ public abstract class HAAdmin extends Configured implements Tool {
|
||||
return -1;
|
||||
}
|
||||
|
||||
InetSocketAddress addr1 = NetUtils.createSocketAddr(args[0]);
|
||||
InetSocketAddress addr2 = NetUtils.createSocketAddr(args[1]);
|
||||
InetSocketAddress addr1 =
|
||||
NetUtils.createSocketAddr(getServiceAddr(args[0]));
|
||||
InetSocketAddress addr2 =
|
||||
NetUtils.createSocketAddr(getServiceAddr(args[1]));
|
||||
HAServiceProtocol proto1 = getProtocol(args[0]);
|
||||
HAServiceProtocol proto2 = getProtocol(args[1]);
|
||||
|
||||
@ -219,11 +225,20 @@ public abstract class HAAdmin extends Configured implements Tool {
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a proxy to the specified target host:port.
|
||||
* Return the serviceId as is, we are assuming it was
|
||||
* given as a service address of form <host:ipcport>.
|
||||
*/
|
||||
protected HAServiceProtocol getProtocol(String target)
|
||||
protected String getServiceAddr(String serviceId) {
|
||||
return serviceId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a proxy to the specified target service.
|
||||
*/
|
||||
protected HAServiceProtocol getProtocol(String serviceId)
|
||||
throws IOException {
|
||||
InetSocketAddress addr = NetUtils.createSocketAddr(target);
|
||||
String serviceAddr = getServiceAddr(serviceId);
|
||||
InetSocketAddress addr = NetUtils.createSocketAddr(serviceAddr);
|
||||
return (HAServiceProtocol)RPC.getProxy(
|
||||
HAServiceProtocol.class, HAServiceProtocol.versionID,
|
||||
addr, getConf());
|
||||
@ -231,6 +246,15 @@ public abstract class HAAdmin extends Configured implements Tool {
|
||||
|
||||
@Override
|
||||
public int run(String[] argv) throws Exception {
|
||||
try {
|
||||
return runCmd(argv);
|
||||
} catch (IllegalArgumentException iae) {
|
||||
errOut.println("Illegal argument: " + iae.getMessage());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
protected int runCmd(String[] argv) throws Exception {
|
||||
if (argv.length < 1) {
|
||||
printUsage(errOut);
|
||||
return -1;
|
||||
@ -244,7 +268,7 @@ public abstract class HAAdmin extends Configured implements Tool {
|
||||
printUsage(errOut);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
if ("-transitionToActive".equals(cmd)) {
|
||||
return transitionToActive(argv);
|
||||
} else if ("-transitionToStandby".equals(cmd)) {
|
||||
|
@ -54,7 +54,7 @@ import com.google.common.collect.Lists;
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class NodeFencer {
|
||||
static final String CONF_METHODS_KEY =
|
||||
public static final String CONF_METHODS_KEY =
|
||||
"dfs.namenode.ha.fencing.methods";
|
||||
|
||||
private static final String CLASS_RE = "([a-zA-Z0-9\\.\\$]+)";
|
||||
|
@ -26,7 +26,6 @@ import java.io.PrintStream;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -88,107 +87,12 @@ public class TestHAAdmin {
|
||||
assertEquals(-1, runTool("-failover", "foo:1234"));
|
||||
assertOutputContains("failover: incorrect arguments");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testHelp() throws Exception {
|
||||
assertEquals(-1, runTool("-help"));
|
||||
assertEquals(0, runTool("-help", "transitionToActive"));
|
||||
assertOutputContains("Transitions the daemon into Active");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransitionToActive() throws Exception {
|
||||
assertEquals(0, runTool("-transitionToActive", "foo:1234"));
|
||||
Mockito.verify(mockProtocol).transitionToActive();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransitionToStandby() throws Exception {
|
||||
assertEquals(0, runTool("-transitionToStandby", "foo:1234"));
|
||||
Mockito.verify(mockProtocol).transitionToStandby();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverWithNoFencerConfigured() throws Exception {
|
||||
Mockito.doReturn(HAServiceState.STANDBY).when(mockProtocol).getServiceState();
|
||||
assertEquals(-1, runTool("-failover", "foo:1234", "bar:5678"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverWithFencerConfigured() throws Exception {
|
||||
Mockito.doReturn(HAServiceState.STANDBY).when(mockProtocol).getServiceState();
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(NodeFencer.CONF_METHODS_KEY, "shell(true)");
|
||||
tool.setConf(conf);
|
||||
assertEquals(0, runTool("-failover", "foo:1234", "bar:5678"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverWithFencerConfiguredAndForce() throws Exception {
|
||||
Mockito.doReturn(HAServiceState.STANDBY).when(mockProtocol).getServiceState();
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(NodeFencer.CONF_METHODS_KEY, "shell(true)");
|
||||
tool.setConf(conf);
|
||||
assertEquals(0, runTool("-failover", "foo:1234", "bar:5678", "--forcefence"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverWithForceActive() throws Exception {
|
||||
Mockito.doReturn(HAServiceState.STANDBY).when(mockProtocol).getServiceState();
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(NodeFencer.CONF_METHODS_KEY, "shell(true)");
|
||||
tool.setConf(conf);
|
||||
assertEquals(0, runTool("-failover", "foo:1234", "bar:5678", "--forceactive"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverWithInvalidFenceArg() throws Exception {
|
||||
Mockito.doReturn(HAServiceState.STANDBY).when(mockProtocol).getServiceState();
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(NodeFencer.CONF_METHODS_KEY, "shell(true)");
|
||||
tool.setConf(conf);
|
||||
assertEquals(-1, runTool("-failover", "foo:1234", "bar:5678", "notforcefence"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverWithFenceButNoFencer() throws Exception {
|
||||
Mockito.doReturn(HAServiceState.STANDBY).when(mockProtocol).getServiceState();
|
||||
assertEquals(-1, runTool("-failover", "foo:1234", "bar:5678", "--forcefence"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverWithFenceAndBadFencer() throws Exception {
|
||||
Mockito.doReturn(HAServiceState.STANDBY).when(mockProtocol).getServiceState();
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(NodeFencer.CONF_METHODS_KEY, "foobar!");
|
||||
tool.setConf(conf);
|
||||
assertEquals(-1, runTool("-failover", "foo:1234", "bar:5678", "--forcefence"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testForceFenceOptionListedBeforeArgs() throws Exception {
|
||||
Mockito.doReturn(HAServiceState.STANDBY).when(mockProtocol).getServiceState();
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(NodeFencer.CONF_METHODS_KEY, "shell(true)");
|
||||
tool.setConf(conf);
|
||||
assertEquals(0, runTool("-failover", "--forcefence", "foo:1234", "bar:5678"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetServiceState() throws Exception {
|
||||
assertEquals(0, runTool("-getServiceState", "foo:1234"));
|
||||
Mockito.verify(mockProtocol).getServiceState();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckHealth() throws Exception {
|
||||
assertEquals(0, runTool("-checkHealth", "foo:1234"));
|
||||
Mockito.verify(mockProtocol).monitorHealth();
|
||||
|
||||
Mockito.doThrow(new HealthCheckFailedException("fake health check failure"))
|
||||
.when(mockProtocol).monitorHealth();
|
||||
assertEquals(-1, runTool("-checkHealth", "foo:1234"));
|
||||
assertOutputContains("Health check failed: fake health check failure");
|
||||
assertOutputContains("Transitions the service into Active");
|
||||
}
|
||||
|
||||
private Object runTool(String ... args) throws Exception {
|
||||
@ -199,5 +103,4 @@ public class TestHAAdmin {
|
||||
LOG.info("Output:\n" + errOutput);
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -166,3 +166,5 @@ HDFS-2874. Edit log should log to shared dirs before local dirs. (todd)
|
||||
HDFS-2890. DFSUtil#getSuffixIDs should skip unset configurations. (atm)
|
||||
|
||||
HDFS-2792. Make fsck work. (atm)
|
||||
|
||||
HDFS-2808. HA: haadmin should use namenode ids. (eli)
|
||||
|
@ -1042,4 +1042,39 @@ public class DFSUtil {
|
||||
RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class);
|
||||
server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
|
||||
}
|
||||
|
||||
/**
|
||||
* Map a logical namenode ID to its service address. Use the given
|
||||
* nameservice if specified, or the configured one if none is given.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @param nsId which nameservice nnId is a part of, optional
|
||||
* @param nnId the namenode ID to get the service addr for
|
||||
* @return the service addr, null if it could not be determined
|
||||
*/
|
||||
public static String getNamenodeServiceAddr(final Configuration conf,
|
||||
String nsId, String nnId) {
|
||||
|
||||
if (nsId == null) {
|
||||
Collection<String> nsIds = getNameServiceIds(conf);
|
||||
if (nsIds.size() != 1) {
|
||||
// No nameservice ID was given and more than one is configured
|
||||
return null;
|
||||
} else {
|
||||
nsId = nsIds.toArray(new String[1])[0];
|
||||
}
|
||||
}
|
||||
|
||||
String serviceAddrKey = concatSuffixes(
|
||||
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nsId, nnId);
|
||||
|
||||
String addrKey = concatSuffixes(
|
||||
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nsId, nnId);
|
||||
|
||||
String serviceRpcAddr = conf.get(serviceAddrKey);
|
||||
if (serviceRpcAddr == null) {
|
||||
serviceRpcAddr = conf.get(addrKey);
|
||||
}
|
||||
return serviceRpcAddr;
|
||||
}
|
||||
}
|
||||
|
@ -17,12 +17,16 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.tools;
|
||||
|
||||
import java.io.PrintStream;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.ha.HAAdmin;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
@ -30,10 +34,15 @@ import org.apache.hadoop.util.ToolRunner;
|
||||
* Class to extend HAAdmin to do a little bit of HDFS-specific configuration.
|
||||
*/
|
||||
public class DFSHAAdmin extends HAAdmin {
|
||||
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(DFSHAAdmin.class);
|
||||
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(DFSHAAdmin.class);
|
||||
|
||||
private String nameserviceId;
|
||||
|
||||
protected void setErrOut(PrintStream errOut) {
|
||||
this.errOut = errOut;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
if (conf != null) {
|
||||
@ -52,6 +61,54 @@ public class DFSHAAdmin extends HAAdmin {
|
||||
super.setConf(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to map the given namenode ID to its service address.
|
||||
*/
|
||||
@Override
|
||||
protected String getServiceAddr(String nnId) {
|
||||
HdfsConfiguration conf = (HdfsConfiguration)getConf();
|
||||
String serviceAddr =
|
||||
DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, nnId);
|
||||
if (serviceAddr == null) {
|
||||
throw new IllegalArgumentException(
|
||||
"Unable to determine service address for namenode '" + nnId + "'");
|
||||
}
|
||||
return serviceAddr;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getUsageString() {
|
||||
return "Usage: DFSHAAdmin [-ns <nameserviceId>]";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int runCmd(String[] argv) throws Exception {
|
||||
if (argv.length < 1) {
|
||||
printUsage(errOut);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
String cmd = argv[i++];
|
||||
|
||||
if ("-ns".equals(cmd)) {
|
||||
if (i == argv.length) {
|
||||
errOut.println("Missing nameservice ID");
|
||||
printUsage(errOut);
|
||||
return -1;
|
||||
}
|
||||
nameserviceId = argv[i++];
|
||||
if (i >= argv.length) {
|
||||
errOut.println("Missing command");
|
||||
printUsage(errOut);
|
||||
return -1;
|
||||
}
|
||||
argv = Arrays.copyOfRange(argv, i, argv.length);
|
||||
}
|
||||
|
||||
return super.runCmd(argv);
|
||||
}
|
||||
|
||||
public static void main(String[] argv) throws Exception {
|
||||
int res = ToolRunner.run(new DFSHAAdmin(), argv);
|
||||
System.exit(res);
|
||||
|
@ -406,6 +406,53 @@ public class TestDFSUtil {
|
||||
assertEquals(NS1_NN2_HOST, map.get("ns1").get("ns1-nn2").toString());
|
||||
assertEquals(NS2_NN1_HOST, map.get("ns2").get("ns2-nn1").toString());
|
||||
assertEquals(NS2_NN2_HOST, map.get("ns2").get("ns2-nn2").toString());
|
||||
|
||||
assertEquals(NS1_NN1_HOST,
|
||||
DFSUtil.getNamenodeServiceAddr(conf, "ns1", "ns1-nn1"));
|
||||
assertEquals(NS1_NN2_HOST,
|
||||
DFSUtil.getNamenodeServiceAddr(conf, "ns1", "ns1-nn2"));
|
||||
assertEquals(NS2_NN1_HOST,
|
||||
DFSUtil.getNamenodeServiceAddr(conf, "ns2", "ns2-nn1"));
|
||||
|
||||
// No nameservice was given and we can't determine which to use
|
||||
// as two nameservices could share a namenode ID.
|
||||
assertEquals(null, DFSUtil.getNamenodeServiceAddr(conf, null, "ns1-nn1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getNameNodeServiceAddr() throws IOException {
|
||||
HdfsConfiguration conf = new HdfsConfiguration();
|
||||
|
||||
// One nameservice with two NNs
|
||||
final String NS1_NN1_HOST = "ns1-nn1.example.com:8020";
|
||||
final String NS1_NN1_HOST_SVC = "ns1-nn2.example.com:8021";
|
||||
final String NS1_NN2_HOST = "ns1-nn1.example.com:8020";
|
||||
final String NS1_NN2_HOST_SVC = "ns1-nn2.example.com:8021";
|
||||
|
||||
conf.set(DFS_FEDERATION_NAMESERVICES, "ns1");
|
||||
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY, "ns1"),"nn1,nn2");
|
||||
|
||||
conf.set(DFSUtil.addKeySuffixes(
|
||||
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn1"), NS1_NN1_HOST);
|
||||
conf.set(DFSUtil.addKeySuffixes(
|
||||
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn2"), NS1_NN2_HOST);
|
||||
|
||||
// The rpc address is used if no service address is defined
|
||||
assertEquals(NS1_NN1_HOST, DFSUtil.getNamenodeServiceAddr(conf, null, "nn1"));
|
||||
assertEquals(NS1_NN2_HOST, DFSUtil.getNamenodeServiceAddr(conf, null, "nn2"));
|
||||
|
||||
// A nameservice is specified explicitly
|
||||
assertEquals(NS1_NN1_HOST, DFSUtil.getNamenodeServiceAddr(conf, "ns1", "nn1"));
|
||||
assertEquals(null, DFSUtil.getNamenodeServiceAddr(conf, "invalid", "nn1"));
|
||||
|
||||
// The service addrs are used when they are defined
|
||||
conf.set(DFSUtil.addKeySuffixes(
|
||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "ns1", "nn1"), NS1_NN1_HOST_SVC);
|
||||
conf.set(DFSUtil.addKeySuffixes(
|
||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "ns1", "nn2"), NS1_NN2_HOST_SVC);
|
||||
|
||||
assertEquals(NS1_NN1_HOST_SVC, DFSUtil.getNamenodeServiceAddr(conf, null, "nn1"));
|
||||
assertEquals(NS1_NN2_HOST_SVC, DFSUtil.getNamenodeServiceAddr(conf, null, "nn2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -57,7 +57,7 @@ public class TestCheckPointForSecurityTokens {
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests save namepsace.
|
||||
* Tests save namespace.
|
||||
*/
|
||||
@Test
|
||||
public void testSaveNamespace() throws IOException {
|
||||
|
@ -861,7 +861,7 @@ public class TestCheckpoint extends TestCase {
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests save namepsace.
|
||||
* Tests save namespace.
|
||||
*/
|
||||
public void testSaveNamespace() throws IOException {
|
||||
MiniDFSCluster cluster = null;
|
||||
|
Loading…
x
Reference in New Issue
Block a user